package org.apache.flink.streaming.api.datastream;

import java.io.Serializable;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.function.co.CoWindowFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeWrapper;

/* loaded from: input_file:org/apache/flink/streaming/api/datastream/ConnectedDataStream.class */
public class ConnectedDataStream<IN1, IN2> {
    protected StreamExecutionEnvironment environment;
    protected JobGraphBuilder jobGraphBuilder;
    protected DataStream<IN1> dataStream1;
    protected DataStream<IN2> dataStream2;
    protected boolean isGrouped;
    protected int keyPosition1;
    protected int keyPosition2;

    /* JADX INFO: Access modifiers changed from: protected */
    public ConnectedDataStream(DataStream<IN1> dataStream, DataStream<IN2> dataStream2) {
        this.jobGraphBuilder = dataStream.jobGraphBuilder;
        this.environment = dataStream.environment;
        this.dataStream1 = dataStream.copy();
        this.dataStream2 = dataStream2.copy();
        if ((dataStream instanceof GroupedDataStream) && (dataStream2 instanceof GroupedDataStream)) {
            this.isGrouped = true;
            this.keyPosition1 = ((GroupedDataStream) dataStream).keyPosition;
            this.keyPosition2 = ((GroupedDataStream) dataStream2).keyPosition;
        } else {
            this.isGrouped = false;
            this.keyPosition1 = 0;
            this.keyPosition2 = 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ConnectedDataStream(ConnectedDataStream<IN1, IN2> connectedDataStream) {
        this.jobGraphBuilder = connectedDataStream.jobGraphBuilder;
        this.environment = connectedDataStream.environment;
        this.dataStream1 = connectedDataStream.getFirst();
        this.dataStream2 = connectedDataStream.getSecond();
        this.isGrouped = connectedDataStream.isGrouped;
        this.keyPosition1 = connectedDataStream.keyPosition1;
        this.keyPosition2 = connectedDataStream.keyPosition2;
    }

    public DataStream<IN1> getFirst() {
        return this.dataStream1.copy();
    }

    public DataStream<IN2> getSecond() {
        return this.dataStream2.copy();
    }

    public TypeInformation<IN1> getInputType1() {
        return this.dataStream1.getOutputType();
    }

    public TypeInformation<IN2> getInputType2() {
        return this.dataStream2.getOutputType();
    }

    public ConnectedDataStream<IN1, IN2> groupBy(int i, int i2) {
        if (i < 0 || i2 < 0) {
            throw new IllegalArgumentException("The position of the field must be non-negative");
        }
        return new ConnectedDataStream<>(this.dataStream1.groupBy(i), this.dataStream2.groupBy(i2));
    }

    public CoBatchedDataStream<IN1, IN2> batch(long j, long j2, long j3, long j4) {
        if (j < 1 || j2 < 1) {
            throw new IllegalArgumentException("Batch size must be positive");
        }
        if (j3 < 1 || j4 < 1) {
            throw new IllegalArgumentException("Slide size must be positive");
        }
        return new CoBatchedDataStream<>(this, j, j2, j3, j4);
    }

    public CoBatchedDataStream<IN1, IN2> batch(long j, long j2) {
        return batch(j, j2, j, j2);
    }

    public CoWindowDataStream<IN1, IN2> window(long j, long j2, long j3, long j4, TimeStamp<IN1> timeStamp, TimeStamp<IN2> timeStamp2) {
        if (j < 1 || j2 < 1) {
            throw new IllegalArgumentException("Window size must be positive");
        }
        if (j3 < 1 || j4 < 1) {
            throw new IllegalArgumentException("Slide interval must be positive");
        }
        return new CoWindowDataStream<>(this, j, j2, j3, j4, timeStamp, timeStamp2);
    }

    public CoWindowDataStream<IN1, IN2> window(long j, long j2, long j3, long j4) {
        return window(j, j2, j3, j4, new DefaultTimeStamp(), new DefaultTimeStamp());
    }

    public CoWindowDataStream<IN1, IN2> window(long j, long j2, TimeStamp<IN1> timeStamp, TimeStamp<IN2> timeStamp2) {
        return window(j, j2, j, j2, timeStamp, timeStamp2);
    }

    public CoWindowDataStream<IN1, IN2> window(long j, long j2) {
        return window(j, j2, j, j2, new DefaultTimeStamp(), new DefaultTimeStamp());
    }

    public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapFunction) {
        return addCoFunction("coMap", coMapFunction, new FunctionTypeWrapper(coMapFunction, CoMapFunction.class, 0), new FunctionTypeWrapper(coMapFunction, CoMapFunction.class, 1), new FunctionTypeWrapper(coMapFunction, CoMapFunction.class, 2), new CoMapInvokable(coMapFunction));
    }

    public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(CoFlatMapFunction<IN1, IN2, OUT> coFlatMapFunction) {
        return addCoFunction("coFlatMap", coFlatMapFunction, new FunctionTypeWrapper(coFlatMapFunction, CoFlatMapFunction.class, 0), new FunctionTypeWrapper(coFlatMapFunction, CoFlatMapFunction.class, 1), new FunctionTypeWrapper(coFlatMapFunction, CoFlatMapFunction.class, 2), new CoFlatMapInvokable(coFlatMapFunction));
    }

    public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReduceFunction) {
        return addCoFunction("coReduce", coReduceFunction, new FunctionTypeWrapper(coReduceFunction, CoReduceFunction.class, 0), new FunctionTypeWrapper(coReduceFunction, CoReduceFunction.class, 1), new FunctionTypeWrapper(coReduceFunction, CoReduceFunction.class, 2), getReduceInvokable(coReduceFunction));
    }

    public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long j, long j2) {
        return windowReduce(coWindowFunction, j, j2, new DefaultTimeStamp(), new DefaultTimeStamp());
    }

    public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long j, long j2, TimeStamp<IN1> timeStamp, TimeStamp<IN2> timeStamp2) {
        if (j < 1) {
            throw new IllegalArgumentException("Window size must be positive");
        }
        if (j2 < 1) {
            throw new IllegalArgumentException("Slide interval must be positive");
        }
        return addCoFunction("coWindowReduce", coWindowFunction, new FunctionTypeWrapper(coWindowFunction, CoWindowFunction.class, 0), new FunctionTypeWrapper(coWindowFunction, CoWindowFunction.class, 1), new FunctionTypeWrapper(coWindowFunction, CoWindowFunction.class, 2), new CoWindowInvokable(coWindowFunction, j, j2, timeStamp, timeStamp2));
    }

    protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReduceFunction) {
        return this.isGrouped ? new CoGroupedReduceInvokable(coReduceFunction, this.keyPosition1, this.keyPosition2) : new CoReduceInvokable(coReduceFunction);
    }

    protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String str, Function function, TypeWrapper<IN1> typeWrapper, TypeWrapper<IN2> typeWrapper2, TypeWrapper<OUT> typeWrapper3, CoInvokable<IN1, IN2, OUT> coInvokable) {
        SingleOutputStreamOperator<OUT, ?> singleOutputStreamOperator = new SingleOutputStreamOperator<>(this.environment, str, typeWrapper3);
        try {
            this.dataStream1.jobGraphBuilder.addCoTask(singleOutputStreamOperator.getId(), coInvokable, typeWrapper, typeWrapper2, typeWrapper3, str, SerializationUtils.serialize((Serializable) function), this.environment.getDegreeOfParallelism());
            this.dataStream1.connectGraph(this.dataStream1, singleOutputStreamOperator.getId(), 1);
            this.dataStream1.connectGraph(this.dataStream2, singleOutputStreamOperator.getId(), 2);
            return singleOutputStreamOperator;
        } catch (SerializationException e) {
            throw new RuntimeException("Cannot serialize user defined function");
        }
    }

    protected ConnectedDataStream<IN1, IN2> copy() {
        return new ConnectedDataStream<>(this);
    }
}
