package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.IntervalJoinSpec;
import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.KeyedCoProcessOperatorWithWatermarkDelay;
import org.apache.flink.table.runtime.operators.join.OuterJoinPaddingUtil;
import org.apache.flink.table.runtime.operators.join.interval.IntervalJoinFunction;
import org.apache.flink.table.runtime.operators.join.interval.ProcTimeIntervalJoin;
import org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JsonIgnoreProperties(ignoreUnknown = true)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.class */
public class StreamExecIntervalJoin extends ExecNodeBase<RowData> implements StreamExecNode<RowData>, MultipleTransformationTranslator<RowData> {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamExecIntervalJoin.class);
    public static final String FIELD_NAME_INTERVAL_JOIN_SPEC = "intervalJoinSpec";

    @JsonProperty(FIELD_NAME_INTERVAL_JOIN_SPEC)
    private final IntervalJoinSpec intervalJoinSpec;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin$FilterAllFlatMapFunction.class */
    public static class FilterAllFlatMapFunction implements FlatMapFunction<RowData, RowData>, ResultTypeQueryable<RowData> {
        private static final long serialVersionUID = 1;
        private final InternalTypeInfo<RowData> outputTypeInfo;

        public FilterAllFlatMapFunction(InternalTypeInfo<RowData> internalTypeInfo) {
            this.outputTypeInfo = internalTypeInfo;
        }

        public void flatMap(RowData rowData, Collector<RowData> collector) {
        }

        public TypeInformation<RowData> getProducedType() {
            return this.outputTypeInfo;
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((RowData) obj, (Collector<RowData>) collector);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin$PaddingLeftMapFunction.class */
    public static class PaddingLeftMapFunction implements MapFunction<RowData, RowData>, ResultTypeQueryable<RowData> {
        private static final long serialVersionUID = 1;
        private final OuterJoinPaddingUtil paddingUtil;
        private final InternalTypeInfo<RowData> outputTypeInfo;

        public PaddingLeftMapFunction(OuterJoinPaddingUtil outerJoinPaddingUtil, InternalTypeInfo<RowData> internalTypeInfo) {
            this.paddingUtil = outerJoinPaddingUtil;
            this.outputTypeInfo = internalTypeInfo;
        }

        public RowData map(RowData rowData) {
            return this.paddingUtil.padLeft(rowData);
        }

        public TypeInformation<RowData> getProducedType() {
            return this.outputTypeInfo;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin$PaddingRightMapFunction.class */
    public static class PaddingRightMapFunction implements MapFunction<RowData, RowData>, ResultTypeQueryable<RowData> {
        private static final long serialVersionUID = 1;
        private final OuterJoinPaddingUtil paddingUtil;
        private final InternalTypeInfo<RowData> outputTypeInfo;

        public PaddingRightMapFunction(OuterJoinPaddingUtil outerJoinPaddingUtil, InternalTypeInfo<RowData> internalTypeInfo) {
            this.paddingUtil = outerJoinPaddingUtil;
            this.outputTypeInfo = internalTypeInfo;
        }

        public RowData map(RowData rowData) {
            return this.paddingUtil.padRight(rowData);
        }

        public TypeInformation<RowData> getProducedType() {
            return this.outputTypeInfo;
        }
    }

    public StreamExecIntervalJoin(IntervalJoinSpec intervalJoinSpec, InputProperty inputProperty, InputProperty inputProperty2, RowType rowType, String str) {
        this(intervalJoinSpec, getNewNodeId(), Lists.newArrayList(new InputProperty[]{inputProperty, inputProperty2}), rowType, str);
    }

    @JsonCreator
    public StreamExecIntervalJoin(@JsonProperty("intervalJoinSpec") IntervalJoinSpec intervalJoinSpec, @JsonProperty("id") int i, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, list, rowType, str);
        Preconditions.checkArgument(list.size() == 2);
        this.intervalJoinSpec = (IntervalJoinSpec) Preconditions.checkNotNull(intervalJoinSpec);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase) {
        ExecEdge execEdge = getInputEdges().get(0);
        ExecEdge execEdge2 = getInputEdges().get(1);
        RowType rowType = (RowType) execEdge.getOutputType();
        RowType rowType2 = (RowType) execEdge2.getOutputType();
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        Transformation<?> translateToPlan2 = execEdge2.translateToPlan(plannerBase);
        InternalTypeInfo<RowData> of = InternalTypeInfo.of((RowType) getOutputType());
        JoinSpec joinSpec = this.intervalJoinSpec.getJoinSpec();
        IntervalJoinSpec.WindowBounds windowBounds = this.intervalJoinSpec.getWindowBounds();
        switch (joinSpec.getJoinType()) {
            case INNER:
            case LEFT:
            case RIGHT:
            case FULL:
                long leftUpperBound = windowBounds.getLeftUpperBound() - windowBounds.getLeftLowerBound();
                if (leftUpperBound < 0) {
                    LOGGER.warn("The relative time interval size " + leftUpperBound + "is negative, please check the join conditions.");
                    return createNegativeWindowSizeJoin(joinSpec, translateToPlan, translateToPlan2, rowType.getFieldCount(), rowType2.getFieldCount(), of);
                }
                IntervalJoinFunction intervalJoinFunction = new IntervalJoinFunction(JoinUtil.generateConditionFunction(plannerBase.getTableConfig(), joinSpec, rowType, rowType2), of, joinSpec.getFilterNulls());
                TwoInputTransformation<RowData, RowData, RowData> createRowTimeJoin = windowBounds.isEventTime() ? createRowTimeJoin(translateToPlan, translateToPlan2, of, intervalJoinFunction, joinSpec, windowBounds) : createProcTimeJoin(translateToPlan, translateToPlan2, of, intervalJoinFunction, joinSpec, windowBounds);
                if (inputsContainSingleton()) {
                    createRowTimeJoin.setParallelism(1);
                    createRowTimeJoin.setMaxParallelism(1);
                }
                RowDataKeySelector rowDataSelector = KeySelectorUtil.getRowDataSelector(joinSpec.getLeftKeys(), InternalTypeInfo.of(rowType));
                createRowTimeJoin.setStateKeySelectors(rowDataSelector, KeySelectorUtil.getRowDataSelector(joinSpec.getRightKeys(), InternalTypeInfo.of(rowType2)));
                createRowTimeJoin.setStateKeyType(rowDataSelector.mo6135getProducedType());
                return createRowTimeJoin;
            default:
                throw new TableException("Interval Join: " + joinSpec.getJoinType() + " Join between stream and stream is not supported yet.\nplease re-check interval join statement according to description above.");
        }
    }

    private Transformation<RowData> createNegativeWindowSizeJoin(JoinSpec joinSpec, Transformation<RowData> transformation, Transformation<RowData> transformation2, int i, int i2, InternalTypeInfo<RowData> internalTypeInfo) {
        FilterAllFlatMapFunction filterAllFlatMapFunction = new FilterAllFlatMapFunction(internalTypeInfo);
        OuterJoinPaddingUtil outerJoinPaddingUtil = new OuterJoinPaddingUtil(i, i2);
        PaddingLeftMapFunction paddingLeftMapFunction = new PaddingLeftMapFunction(outerJoinPaddingUtil, internalTypeInfo);
        PaddingRightMapFunction paddingRightMapFunction = new PaddingRightMapFunction(outerJoinPaddingUtil, internalTypeInfo);
        int parallelism = transformation.getParallelism();
        int parallelism2 = transformation2.getParallelism();
        Transformation oneInputTransformation = new OneInputTransformation(transformation, "filter all left input transformation", new StreamFlatMap(filterAllFlatMapFunction), internalTypeInfo, parallelism);
        Transformation oneInputTransformation2 = new OneInputTransformation(transformation2, "filter all right input transformation", new StreamFlatMap(filterAllFlatMapFunction), internalTypeInfo, parallelism2);
        Transformation oneInputTransformation3 = new OneInputTransformation(transformation, "pad left input transformation", new StreamMap(paddingLeftMapFunction), internalTypeInfo, parallelism);
        Transformation oneInputTransformation4 = new OneInputTransformation(transformation2, "pad right input transformation", new StreamMap(paddingRightMapFunction), internalTypeInfo, parallelism2);
        switch (joinSpec.getJoinType()) {
            case INNER:
                return new UnionTransformation(Lists.newArrayList(new Transformation[]{oneInputTransformation, oneInputTransformation2}));
            case LEFT:
                return new UnionTransformation(Lists.newArrayList(new Transformation[]{oneInputTransformation3, oneInputTransformation2}));
            case RIGHT:
                return new UnionTransformation(Lists.newArrayList(new Transformation[]{oneInputTransformation, oneInputTransformation4}));
            case FULL:
                return new UnionTransformation(Lists.newArrayList(new Transformation[]{oneInputTransformation3, oneInputTransformation4}));
            default:
                throw new TableException("should no reach here");
        }
    }

    private TwoInputTransformation<RowData, RowData, RowData> createProcTimeJoin(Transformation<RowData> transformation, Transformation<RowData> transformation2, InternalTypeInfo<RowData> internalTypeInfo, IntervalJoinFunction intervalJoinFunction, JoinSpec joinSpec, IntervalJoinSpec.WindowBounds windowBounds) {
        return new TwoInputTransformation<>(transformation, transformation2, getDescription(), new KeyedCoProcessOperator(new ProcTimeIntervalJoin(joinSpec.getJoinType(), windowBounds.getLeftLowerBound(), windowBounds.getLeftUpperBound(), (InternalTypeInfo) transformation.getOutputType(), (InternalTypeInfo) transformation2.getOutputType(), intervalJoinFunction)), internalTypeInfo, transformation.getParallelism());
    }

    private TwoInputTransformation<RowData, RowData, RowData> createRowTimeJoin(Transformation<RowData> transformation, Transformation<RowData> transformation2, InternalTypeInfo<RowData> internalTypeInfo, IntervalJoinFunction intervalJoinFunction, JoinSpec joinSpec, IntervalJoinSpec.WindowBounds windowBounds) {
        RowTimeIntervalJoin rowTimeIntervalJoin = new RowTimeIntervalJoin(joinSpec.getJoinType(), windowBounds.getLeftLowerBound(), windowBounds.getLeftUpperBound(), 0L, (InternalTypeInfo) transformation.getOutputType(), (InternalTypeInfo) transformation2.getOutputType(), intervalJoinFunction, windowBounds.getLeftTimeIdx(), windowBounds.getRightTimeIdx());
        return new TwoInputTransformation<>(transformation, transformation2, getDescription(), new KeyedCoProcessOperatorWithWatermarkDelay(rowTimeIntervalJoin, rowTimeIntervalJoin.getMaxOutputDelay()), internalTypeInfo, transformation.getParallelism());
    }
}
