package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.List;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy;
import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.window.WindowJoinOperatorBuilder;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@JsonIgnoreProperties(ignoreUnknown = true)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.class */
public class StreamExecWindowJoin extends ExecNodeBase<RowData> implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {
    public static final String FIELD_NAME_JOIN_SPEC = "joinSpec";
    public static final String FIELD_NAME_LEFT_WINDOWING = "leftWindowing";
    public static final String FIELD_NAME_RIGHT_WINDOWING = "rightWindowing";

    @JsonProperty("joinSpec")
    private final JoinSpec joinSpec;

    @JsonProperty(FIELD_NAME_LEFT_WINDOWING)
    private final WindowingStrategy leftWindowing;

    @JsonProperty(FIELD_NAME_RIGHT_WINDOWING)
    private final WindowingStrategy rightWindowing;

    public StreamExecWindowJoin(JoinSpec joinSpec, WindowingStrategy windowingStrategy, WindowingStrategy windowingStrategy2, InputProperty inputProperty, InputProperty inputProperty2, RowType rowType, String str) {
        this(joinSpec, windowingStrategy, windowingStrategy2, getNewNodeId(), Lists.newArrayList(new InputProperty[]{inputProperty, inputProperty2}), rowType, str);
    }

    @JsonCreator
    public StreamExecWindowJoin(@JsonProperty("joinSpec") JoinSpec joinSpec, @JsonProperty("leftWindowing") WindowingStrategy windowingStrategy, @JsonProperty("rightWindowing") WindowingStrategy windowingStrategy2, @JsonProperty("id") int i, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, list, rowType, str);
        Preconditions.checkArgument(list.size() == 2);
        this.joinSpec = (JoinSpec) Preconditions.checkNotNull(joinSpec);
        validate(windowingStrategy);
        validate(windowingStrategy2);
        this.leftWindowing = windowingStrategy;
        this.rightWindowing = windowingStrategy2;
    }

    private void validate(WindowingStrategy windowingStrategy) {
        if (!windowingStrategy.isRowtime()) {
            throw new TableException("Processing time Window Join is not supported yet.");
        }
        if (!(windowingStrategy instanceof WindowAttachedWindowingStrategy)) {
            throw new TableException(windowingStrategy.getClass().getName() + " is not supported yet.");
        }
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase) {
        int windowEnd = ((WindowAttachedWindowingStrategy) this.leftWindowing).getWindowEnd();
        int windowEnd2 = ((WindowAttachedWindowingStrategy) this.rightWindowing).getWindowEnd();
        ExecEdge execEdge = getInputEdges().get(0);
        ExecEdge execEdge2 = getInputEdges().get(1);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        Transformation<?> translateToPlan2 = execEdge2.translateToPlan(plannerBase);
        RowType rowType = (RowType) execEdge.getOutputType();
        RowType rowType2 = (RowType) execEdge2.getOutputType();
        JoinUtil.validateJoinSpec(this.joinSpec, rowType, rowType2, true);
        int[] leftKeys = this.joinSpec.getLeftKeys();
        int[] rightKeys = this.joinSpec.getRightKeys();
        InternalTypeInfo<RowData> of = InternalTypeInfo.of(rowType);
        InternalTypeInfo<RowData> of2 = InternalTypeInfo.of(rowType2);
        TableConfig tableConfig = plannerBase.getTableConfig();
        GeneratedJoinCondition generateConditionFunction = JoinUtil.generateConditionFunction(tableConfig, this.joinSpec, rowType, rowType2);
        TwoInputTransformation twoInputTransformation = new TwoInputTransformation(translateToPlan, translateToPlan2, getDescription(), WindowJoinOperatorBuilder.builder().leftSerializer(of.toRowSerializer()).rightSerializer(of2.toRowSerializer()).generatedJoinCondition(generateConditionFunction).leftWindowEndIndex(windowEnd).rightWindowEndIndex(windowEnd2).filterNullKeys(this.joinSpec.getFilterNulls()).joinType(this.joinSpec.getJoinType()).withShiftTimezone(TimeWindowUtil.getShiftTimeZone(this.leftWindowing.getTimeAttributeType(), tableConfig)).build(), InternalTypeInfo.of((RowType) getOutputType()), translateToPlan.getParallelism());
        RowDataKeySelector rowDataSelector = KeySelectorUtil.getRowDataSelector(leftKeys, of);
        twoInputTransformation.setStateKeySelectors(rowDataSelector, KeySelectorUtil.getRowDataSelector(rightKeys, of2));
        twoInputTransformation.setStateKeyType(rowDataSelector.mo6188getProducedType());
        return twoInputTransformation;
    }
}
