package org.apache.flink.streaming.api.datastream;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.StreamGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.function.co.CoWindowFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;

/* loaded from: input_file:org/apache/flink/streaming/api/datastream/ConnectedDataStream.class */
public class ConnectedDataStream<IN1, IN2> {
    protected StreamExecutionEnvironment environment;
    protected StreamGraph jobGraphBuilder;
    protected DataStream<IN1> dataStream1;
    protected DataStream<IN2> dataStream2;
    protected boolean isGrouped;
    protected KeySelector<IN1, ?> keySelector1;
    protected KeySelector<IN2, ?> keySelector2;

    /* JADX INFO: Access modifiers changed from: protected */
    public ConnectedDataStream(DataStream<IN1> dataStream, DataStream<IN2> dataStream2) {
        this.jobGraphBuilder = dataStream.streamGraph;
        this.environment = dataStream.environment;
        this.dataStream1 = dataStream.copy();
        this.dataStream2 = dataStream2.copy();
        if ((dataStream instanceof GroupedDataStream) && (dataStream2 instanceof GroupedDataStream)) {
            this.isGrouped = true;
            this.keySelector1 = (KeySelector<IN1, ?>) ((GroupedDataStream) dataStream).keySelector;
            this.keySelector2 = (KeySelector<IN2, ?>) ((GroupedDataStream) dataStream2).keySelector;
        } else {
            this.isGrouped = false;
            this.keySelector1 = null;
            this.keySelector2 = null;
        }
    }

    protected ConnectedDataStream(ConnectedDataStream<IN1, IN2> connectedDataStream) {
        this.jobGraphBuilder = connectedDataStream.jobGraphBuilder;
        this.environment = connectedDataStream.environment;
        this.dataStream1 = connectedDataStream.getFirst();
        this.dataStream2 = connectedDataStream.getSecond();
        this.isGrouped = connectedDataStream.isGrouped;
        this.keySelector1 = connectedDataStream.keySelector1;
        this.keySelector2 = connectedDataStream.keySelector2;
    }

    public <F> F clean(F f) {
        if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
            ClosureCleaner.clean(f, true);
        }
        ClosureCleaner.ensureSerializable(f);
        return f;
    }

    public StreamExecutionEnvironment getExecutionEnvironment() {
        return this.environment;
    }

    public DataStream<IN1> getFirst() {
        return this.dataStream1.copy();
    }

    public DataStream<IN2> getSecond() {
        return this.dataStream2.copy();
    }

    public TypeInformation<IN1> getInputType1() {
        return this.dataStream1.getType();
    }

    public TypeInformation<IN2> getInputType2() {
        return this.dataStream2.getType();
    }

    public ConnectedDataStream<IN1, IN2> groupBy(int i, int i2) {
        return new ConnectedDataStream<>(this.dataStream1.groupBy(i), this.dataStream2.groupBy(i2));
    }

    public ConnectedDataStream<IN1, IN2> groupBy(int[] iArr, int[] iArr2) {
        return new ConnectedDataStream<>(this.dataStream1.groupBy(iArr), this.dataStream2.groupBy(iArr2));
    }

    public ConnectedDataStream<IN1, IN2> groupBy(String str, String str2) {
        return new ConnectedDataStream<>(this.dataStream1.groupBy(str), this.dataStream2.groupBy(str2));
    }

    public ConnectedDataStream<IN1, IN2> groupBy(String[] strArr, String[] strArr2) {
        return new ConnectedDataStream<>(this.dataStream1.groupBy(strArr), this.dataStream2.groupBy(strArr2));
    }

    public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector, KeySelector<IN2, ?> keySelector2) {
        return new ConnectedDataStream<>(this.dataStream1.groupBy(keySelector), this.dataStream2.groupBy(keySelector2));
    }

    public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapFunction) {
        return addCoFunction("Co-Map", TypeExtractor.getBinaryOperatorReturnType(coMapFunction, CoMapFunction.class, false, true, getInputType1(), getInputType2(), Utils.getCallLocationName(), true), new CoMapInvokable((CoMapFunction) clean(coMapFunction)));
    }

    public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(CoFlatMapFunction<IN1, IN2, OUT> coFlatMapFunction) {
        return addCoFunction("Co-Flat Map", TypeExtractor.getBinaryOperatorReturnType(coFlatMapFunction, CoFlatMapFunction.class, false, true, getInputType1(), getInputType2(), Utils.getCallLocationName(), true), new CoFlatMapInvokable((CoFlatMapFunction) clean(coFlatMapFunction)));
    }

    public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReduceFunction) {
        return addCoFunction("Co-Reduce", TypeExtractor.getBinaryOperatorReturnType(coReduceFunction, CoReduceFunction.class, false, true, getInputType1(), getInputType2(), Utils.getCallLocationName(), true), getReduceInvokable((CoReduceFunction) clean(coReduceFunction)));
    }

    public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long j, long j2) {
        return windowReduce(coWindowFunction, j, j2, SystemTimestamp.getWrapper(), SystemTimestamp.getWrapper());
    }

    public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long j, long j2, TimestampWrapper<IN1> timestampWrapper, TimestampWrapper<IN2> timestampWrapper2) {
        if (j < 1) {
            throw new IllegalArgumentException("Window size must be positive");
        }
        if (j2 < 1) {
            throw new IllegalArgumentException("Slide interval must be positive");
        }
        return addCoFunction("Co-Window", TypeExtractor.getBinaryOperatorReturnType(coWindowFunction, CoWindowFunction.class, false, true, getInputType1(), getInputType2(), Utils.getCallLocationName(), true), new CoWindowInvokable((CoWindowFunction) clean(coWindowFunction), j, j2, timestampWrapper, timestampWrapper2));
    }

    protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReduceFunction) {
        return this.isGrouped ? new CoGroupedReduceInvokable((CoReduceFunction) clean(coReduceFunction), this.keySelector1, this.keySelector2) : new CoReduceInvokable((CoReduceFunction) clean(coReduceFunction));
    }

    public <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeInformation<OUT> typeInformation, long j, long j2, TimestampWrapper<IN1> timestampWrapper, TimestampWrapper<IN2> timestampWrapper2) {
        if (j < 1) {
            throw new IllegalArgumentException("Window size must be positive");
        }
        if (j2 < 1) {
            throw new IllegalArgumentException("Slide interval must be positive");
        }
        return addCoFunction("Co-Window", typeInformation, new CoWindowInvokable((CoWindowFunction) clean(coWindowFunction), j, j2, timestampWrapper, timestampWrapper2));
    }

    public <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String str, TypeInformation<OUT> typeInformation, CoInvokable<IN1, IN2, OUT> coInvokable) {
        SingleOutputStreamOperator<OUT, ?> singleOutputStreamOperator = new SingleOutputStreamOperator<>(this.environment, str, typeInformation, coInvokable);
        this.dataStream1.streamGraph.addCoTask(singleOutputStreamOperator.getId(), coInvokable, getInputType1(), getInputType2(), typeInformation, str, this.environment.getParallelism());
        this.dataStream1.connectGraph(this.dataStream1, singleOutputStreamOperator.getId(), 1);
        this.dataStream1.connectGraph(this.dataStream2, singleOutputStreamOperator.getId(), 2);
        return singleOutputStreamOperator;
    }

    protected ConnectedDataStream<IN1, IN2> copy() {
        return new ConnectedDataStream<>(this);
    }
}
