package org.apache.flink.streaming.api.datastream;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.function.WindowMapFunction;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.windowing.GroupedActiveDiscretizer;
import org.apache.flink.streaming.api.invokable.operator.windowing.GroupedStreamDiscretizer;
import org.apache.flink.streaming.api.invokable.operator.windowing.GroupedWindowBufferInvokable;
import org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowBufferInvokable;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
import org.apache.flink.streaming.api.windowing.WindowEvent;
import org.apache.flink.streaming.api.windowing.WindowUtils;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimeGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimePreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.PreAggregator;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;

/* loaded from: input_file:org/apache/flink/streaming/api/datastream/WindowedDataStream.class */
public class WindowedDataStream<OUT> {
    protected DataStream<OUT> dataStream;
    protected boolean isLocal;
    protected KeySelector<OUT, ?> discretizerKey;
    protected KeySelector<OUT, ?> groupByKey;
    protected WindowingHelper<OUT> triggerHelper;
    protected WindowingHelper<OUT> evictionHelper;
    protected TriggerPolicy<OUT> userTrigger;
    protected EvictionPolicy<OUT> userEvicter;

    /* JADX INFO: Access modifiers changed from: protected */
    public WindowedDataStream(DataStream<OUT> dataStream, WindowingHelper<OUT> windowingHelper) {
        this.isLocal = false;
        this.dataStream = dataStream.copy();
        this.triggerHelper = windowingHelper;
        if (dataStream instanceof GroupedDataStream) {
            this.discretizerKey = ((GroupedDataStream) dataStream).keySelector;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public WindowedDataStream(DataStream<OUT> dataStream, TriggerPolicy<OUT> triggerPolicy, EvictionPolicy<OUT> evictionPolicy) {
        this.isLocal = false;
        this.dataStream = dataStream.copy();
        this.userTrigger = triggerPolicy;
        this.userEvicter = evictionPolicy;
        if (dataStream instanceof GroupedDataStream) {
            this.discretizerKey = ((GroupedDataStream) dataStream).keySelector;
        }
    }

    protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) {
        this.isLocal = false;
        this.dataStream = windowedDataStream.dataStream.copy();
        this.discretizerKey = windowedDataStream.discretizerKey;
        this.groupByKey = windowedDataStream.groupByKey;
        this.triggerHelper = windowedDataStream.triggerHelper;
        this.evictionHelper = windowedDataStream.evictionHelper;
        this.userTrigger = windowedDataStream.userTrigger;
        this.userEvicter = windowedDataStream.userEvicter;
        this.isLocal = windowedDataStream.isLocal;
    }

    public WindowedDataStream() {
        this.isLocal = false;
    }

    public WindowedDataStream<OUT> every(WindowingHelper windowingHelper) {
        WindowedDataStream<OUT> copy = copy();
        if (copy.evictionHelper == null) {
            copy.evictionHelper = copy.triggerHelper;
            copy.triggerHelper = windowingHelper;
        }
        return copy;
    }

    public WindowedDataStream<OUT> groupBy(int... iArr) {
        return ((getType() instanceof BasicArrayTypeInfo) || (getType() instanceof PrimitiveArrayTypeInfo)) ? groupBy(new KeySelectorUtil.ArrayKeySelector(iArr)) : groupBy((Keys) new Keys.ExpressionKeys(iArr, getType()));
    }

    public WindowedDataStream<OUT> groupBy(String... strArr) {
        return groupBy((Keys) new Keys.ExpressionKeys(strArr, getType()));
    }

    public WindowedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {
        WindowedDataStream<OUT> copy = copy();
        copy.groupByKey = keySelector;
        return copy;
    }

    private WindowedDataStream<OUT> groupBy(Keys<OUT> keys) {
        return groupBy((KeySelector) clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig())));
    }

    public WindowedDataStream<OUT> local() {
        WindowedDataStream<OUT> copy = copy();
        copy.isLocal = true;
        return copy;
    }

    public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
        return discretize(WindowUtils.WindowTransformation.NONE, new BasicWindowBuffer()).getDiscretizedStream();
    }

    public DataStream<OUT> flatten() {
        return this.dataStream;
    }

    public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
        if (WindowUtils.isTimeOnly(getTrigger(), getEviction()) && this.discretizerKey == null && this.dataStream.getParallelism() > 1) {
            return timeReduce(reduceFunction);
        }
        WindowUtils.WindowTransformation with = WindowUtils.WindowTransformation.REDUCEWINDOW.with((Function) clean(reduceFunction));
        WindowBuffer<OUT> windowBuffer = getWindowBuffer(with);
        DiscretizedStream<OUT> discretize = discretize(with, windowBuffer);
        return windowBuffer instanceof PreAggregator ? discretize : discretize.reduceWindow(reduceFunction);
    }

    public <R> DiscretizedStream<R> foldWindow(R r, FoldFunction<OUT, R> foldFunction, TypeInformation<R> typeInformation) {
        return discretize(WindowUtils.WindowTransformation.FOLDWINDOW.with((Function) clean(foldFunction)), new BasicWindowBuffer()).foldWindow(r, foldFunction, typeInformation);
    }

    public <R> DiscretizedStream<R> foldWindow(R r, FoldFunction<OUT, R> foldFunction) {
        return foldWindow(r, foldFunction, TypeExtractor.getFoldReturnTypes((FoldFunction) clean(foldFunction), getType()));
    }

    public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
        return discretize(WindowUtils.WindowTransformation.MAPWINDOW.with((Function) clean(windowMapFunction)), new BasicWindowBuffer()).mapWindow((WindowMapFunction) windowMapFunction);
    }

    public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction, TypeInformation<R> typeInformation) {
        return discretize(WindowUtils.WindowTransformation.MAPWINDOW.with(windowMapFunction), new BasicWindowBuffer()).mapWindow((WindowMapFunction) windowMapFunction, (TypeInformation) typeInformation);
    }

    private DiscretizedStream<OUT> discretize(WindowUtils.WindowTransformation windowTransformation, WindowBuffer<OUT> windowBuffer) {
        StreamInvokable<OUT, WindowEvent<OUT>> discretizer = getDiscretizer();
        StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> bufferInvokable = getBufferInvokable(windowBuffer);
        TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(WindowEvent.class, new TypeInformation[]{getType(), BasicTypeInfo.INT_TYPE_INFO});
        int discretizerParallelism = getDiscretizerParallelism(windowTransformation);
        return new DiscretizedStream<>(this.dataStream.transform(discretizer.getClass().getSimpleName(), tupleTypeInfo, discretizer).setParallelism(discretizerParallelism).transform(windowBuffer.getClass().getSimpleName(), new StreamWindowTypeInfo(getType()), bufferInvokable).setParallelism(discretizerParallelism), this.groupByKey, windowTransformation, false);
    }

    private int getDiscretizerParallelism(WindowUtils.WindowTransformation windowTransformation) {
        if (this.isLocal || ((windowTransformation == WindowUtils.WindowTransformation.REDUCEWINDOW && WindowUtils.isParallelPolicy(getTrigger(), getEviction(), this.dataStream.getParallelism())) || this.discretizerKey != null)) {
            return this.dataStream.environment.getParallelism();
        }
        return 1;
    }

    protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction) {
        WindowUtils.WindowTransformation with = WindowUtils.WindowTransformation.REDUCEWINDOW.with((Function) clean(reduceFunction));
        WindowBuffer<OUT> sequentialID = getWindowBuffer(with).emitEmpty().sequentialID();
        if (this.groupByKey != null) {
            this.dataStream = this.dataStream.groupBy(this.groupByKey);
        }
        return discretize(with, sequentialID).timeReduce(reduceFunction, sequentialID instanceof PreAggregator);
    }

    private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer() {
        return this.discretizerKey == null ? new StreamDiscretizer(getTrigger(), getEviction()) : getTrigger() instanceof CentralActiveTrigger ? new GroupedActiveDiscretizer(this.discretizerKey, (CentralActiveTrigger) getTrigger(), (CloneableEvictionPolicy) getEviction()) : new GroupedStreamDiscretizer(this.discretizerKey, (CloneableTriggerPolicy) getTrigger(), (CloneableEvictionPolicy) getEviction());
    }

    private StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> getBufferInvokable(WindowBuffer<OUT> windowBuffer) {
        return this.discretizerKey == null ? new WindowBufferInvokable(windowBuffer) : new GroupedWindowBufferInvokable(windowBuffer, this.discretizerKey);
    }

    private WindowBuffer<OUT> getWindowBuffer(WindowUtils.WindowTransformation windowTransformation) {
        TriggerPolicy<OUT> trigger = getTrigger();
        EvictionPolicy<OUT> eviction = getEviction();
        if (windowTransformation == WindowUtils.WindowTransformation.REDUCEWINDOW) {
            if (WindowUtils.isTumblingPolicy(trigger, eviction)) {
                return this.groupByKey == null ? new TumblingPreReducer(windowTransformation.getUDF(), getType().createSerializer(getExecutionConfig())) : new TumblingGroupedPreReducer(windowTransformation.getUDF(), this.groupByKey, getType().createSerializer(getExecutionConfig()));
            }
            if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) {
                return this.groupByKey == null ? new SlidingCountPreReducer((ReduceFunction) clean(windowTransformation.getUDF()), this.dataStream.getType().createSerializer(getExecutionConfig()), WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger), ((CountTriggerPolicy) trigger).getStart()) : new SlidingCountGroupedPreReducer((ReduceFunction) clean(windowTransformation.getUDF()), this.dataStream.getType().createSerializer(getExecutionConfig()), this.groupByKey, WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger), ((CountTriggerPolicy) trigger).getStart());
            }
            if (WindowUtils.isSlidingTimePolicy(trigger, eviction)) {
                return this.groupByKey == null ? new SlidingTimePreReducer(windowTransformation.getUDF(), this.dataStream.getType().createSerializer(getExecutionConfig()), WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger), WindowUtils.getTimeStampWrapper(trigger)) : new SlidingTimeGroupedPreReducer(windowTransformation.getUDF(), this.dataStream.getType().createSerializer(getExecutionConfig()), this.groupByKey, WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger), WindowUtils.getTimeStampWrapper(trigger));
            }
            if (WindowUtils.isJumpingCountPolicy(trigger, eviction)) {
                return this.groupByKey == null ? new JumpingCountPreReducer(windowTransformation.getUDF(), getType().createSerializer(getExecutionConfig()), WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction)) : new JumpingCountGroupedPreReducer(windowTransformation.getUDF(), this.groupByKey, getType().createSerializer(getExecutionConfig()), WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
            }
            if (WindowUtils.isJumpingTimePolicy(trigger, eviction)) {
                return this.groupByKey == null ? new JumpingTimePreReducer(windowTransformation.getUDF(), getType().createSerializer(getExecutionConfig()), WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction), WindowUtils.getTimeStampWrapper(trigger)) : new JumpingTimeGroupedPreReducer(windowTransformation.getUDF(), this.groupByKey, getType().createSerializer(getExecutionConfig()), WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction), WindowUtils.getTimeStampWrapper(trigger));
            }
        }
        return new BasicWindowBuffer();
    }

    public WindowedDataStream<OUT> sum(int i) {
        return aggregate((AggregationFunction) SumAggregator.getSumFunction(i, getClassAtPos(i), getType()));
    }

    public WindowedDataStream<OUT> sum(String str) {
        return aggregate((AggregationFunction) SumAggregator.getSumFunction(str, getType(), getExecutionConfig()));
    }

    public WindowedDataStream<OUT> min(int i) {
        return aggregate(ComparableAggregator.getAggregator(i, getType(), AggregationFunction.AggregationType.MIN));
    }

    public WindowedDataStream<OUT> min(String str) {
        return aggregate(ComparableAggregator.getAggregator(str, getType(), AggregationFunction.AggregationType.MIN, false, getExecutionConfig()));
    }

    public WindowedDataStream<OUT> minBy(int i) {
        return minBy(i, true);
    }

    public WindowedDataStream<OUT> minBy(String str) {
        return minBy(str, true);
    }

    public WindowedDataStream<OUT> minBy(int i, boolean z) {
        return aggregate(ComparableAggregator.getAggregator(i, getType(), AggregationFunction.AggregationType.MINBY, z));
    }

    public WindowedDataStream<OUT> minBy(String str, boolean z) {
        return aggregate(ComparableAggregator.getAggregator(str, getType(), AggregationFunction.AggregationType.MINBY, z, getExecutionConfig()));
    }

    public WindowedDataStream<OUT> max(int i) {
        return aggregate(ComparableAggregator.getAggregator(i, getType(), AggregationFunction.AggregationType.MAX));
    }

    public WindowedDataStream<OUT> max(String str) {
        return aggregate(ComparableAggregator.getAggregator(str, getType(), AggregationFunction.AggregationType.MAX, false, getExecutionConfig()));
    }

    public WindowedDataStream<OUT> maxBy(int i) {
        return maxBy(i, true);
    }

    public WindowedDataStream<OUT> maxBy(String str) {
        return maxBy(str, true);
    }

    public WindowedDataStream<OUT> maxBy(int i, boolean z) {
        return aggregate(ComparableAggregator.getAggregator(i, getType(), AggregationFunction.AggregationType.MAXBY, z));
    }

    public WindowedDataStream<OUT> maxBy(String str, boolean z) {
        return aggregate(ComparableAggregator.getAggregator(str, getType(), AggregationFunction.AggregationType.MAXBY, z, getExecutionConfig()));
    }

    private WindowedDataStream<OUT> aggregate(AggregationFunction<OUT> aggregationFunction) {
        return reduceWindow(aggregationFunction);
    }

    protected TriggerPolicy<OUT> getTrigger() {
        if (this.triggerHelper != null) {
            return this.triggerHelper.toTrigger();
        }
        if (this.userTrigger != null) {
            return this.userTrigger;
        }
        throw new RuntimeException("Trigger must not be null");
    }

    protected EvictionPolicy<OUT> getEviction() {
        return this.evictionHelper != null ? this.evictionHelper.toEvict() : (this.userEvicter == null || (this.userEvicter instanceof TumblingEvictionPolicy)) ? this.triggerHelper instanceof Time ? this.triggerHelper.toEvict() : new TumblingEvictionPolicy() : this.userEvicter;
    }

    public <F> F clean(F f) {
        if (getExecutionConfig().isClosureCleanerEnabled()) {
            ClosureCleaner.clean(f, true);
        }
        ClosureCleaner.ensureSerializable(f);
        return f;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isGrouped() {
        return this.groupByKey != null;
    }

    public TypeInformation<OUT> getType() {
        return this.dataStream.getType();
    }

    public ExecutionConfig getExecutionConfig() {
        return this.dataStream.getExecutionConfig();
    }

    protected Class<?> getClassAtPos(int i) {
        return this.dataStream.getClassAtPos(i);
    }

    protected WindowedDataStream<OUT> copy() {
        return new WindowedDataStream<>(this);
    }
}
