package org.apache.beam.sdk.extensions.sql.impl.rel;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.coders.BeamRecordCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdks.java.extensions.sql.repackaged.com.google.common.base.Joiner;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rel.RelNode;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rel.core.CorrelationId;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rel.core.Join;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rel.core.JoinRelType;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rex.RexCall;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rex.RexNode;
import org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.util.Pair;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.class */
public class BeamJoinRel extends Join implements BeamRelNode {
    public BeamJoinRel(RelOptCluster relOptCluster, RelTraitSet relTraitSet, RelNode relNode, RelNode relNode2, RexNode rexNode, Set<CorrelationId> set, JoinRelType joinRelType) {
        super(relOptCluster, relTraitSet, relNode, relNode2, rexNode, set, joinRelType);
    }

    @Override // org.apache.beam.sdks.java.extensions.sql.repackaged.org.apache.calcite.rel.core.Join
    public Join copy(RelTraitSet relTraitSet, RexNode rexNode, RelNode relNode, RelNode relNode2, JoinRelType joinRelType, boolean z) {
        return new BeamJoinRel(getCluster(), relTraitSet, relNode, relNode2, rexNode, this.variablesSet, joinRelType);
    }

    @Override // org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode
    public PCollection<BeamRecord> buildBeamPipeline(PCollectionTuple pCollectionTuple, BeamSqlEnv beamSqlEnv) throws Exception {
        BeamRelNode beamRelInput = BeamSqlRelUtils.getBeamRelInput(this.left);
        BeamRecordSqlType beamRowType = CalciteUtils.toBeamRowType(this.left.getRowType());
        BeamRelNode beamRelInput2 = BeamSqlRelUtils.getBeamRelInput(this.right);
        if (!seekable(beamRelInput, beamSqlEnv) && seekable(beamRelInput2, beamSqlEnv)) {
            return joinAsLookup(beamRelInput, beamRelInput2, pCollectionTuple, beamSqlEnv).setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
        }
        PCollection<BeamRecord> buildBeamPipeline = beamRelInput.buildBeamPipeline(pCollectionTuple, beamSqlEnv);
        PCollection<BeamRecord> buildBeamPipeline2 = beamRelInput2.buildBeamPipeline(pCollectionTuple, beamSqlEnv);
        String stageName = BeamSqlRelUtils.getStageName(this);
        WindowFn windowFn = buildBeamPipeline.getWindowingStrategy().getWindowFn();
        WindowFn windowFn2 = buildBeamPipeline2.getWindowingStrategy().getWindowFn();
        List<Pair<Integer, Integer>> extractJoinColumns = extractJoinColumns(beamRelInput.getRowType().getFieldCount());
        ArrayList arrayList = new ArrayList(extractJoinColumns.size());
        ArrayList arrayList2 = new ArrayList(extractJoinColumns.size());
        for (int i = 0; i < extractJoinColumns.size(); i++) {
            arrayList.add("c" + i);
            arrayList2.add(beamRowType.getFieldTypeByIndex(extractJoinColumns.get(i).getKey().intValue()));
        }
        BeamRecordCoder recordCoder = BeamRecordSqlType.create(arrayList, arrayList2).getRecordCoder();
        PCollection<KV<BeamRecord, BeamRecord>> coder = buildBeamPipeline.apply(stageName + "_left_ExtractJoinFields", MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, extractJoinColumns))).setCoder(KvCoder.of(recordCoder, buildBeamPipeline.getCoder()));
        PCollection<KV<BeamRecord, BeamRecord>> coder2 = buildBeamPipeline2.apply(stageName + "_right_ExtractJoinFields", MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, extractJoinColumns))).setCoder(KvCoder.of(recordCoder, buildBeamPipeline2.getCoder()));
        BeamRecord buildNullRow = buildNullRow(beamRelInput);
        BeamRecord buildNullRow2 = buildNullRow(beamRelInput2);
        if ((buildBeamPipeline.isBounded() == PCollection.IsBounded.BOUNDED && buildBeamPipeline2.isBounded() == PCollection.IsBounded.BOUNDED) || (buildBeamPipeline.isBounded() == PCollection.IsBounded.UNBOUNDED && buildBeamPipeline2.isBounded() == PCollection.IsBounded.UNBOUNDED)) {
            try {
                windowFn.verifyCompatibility(windowFn2);
                return standardJoin(coder, coder2, buildNullRow, buildNullRow2, stageName);
            } catch (IncompatibleWindowException e) {
                throw new IllegalArgumentException("WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e);
            }
        }
        if ((buildBeamPipeline.isBounded() != PCollection.IsBounded.BOUNDED || buildBeamPipeline2.isBounded() != PCollection.IsBounded.UNBOUNDED) && (buildBeamPipeline.isBounded() != PCollection.IsBounded.UNBOUNDED || buildBeamPipeline2.isBounded() != PCollection.IsBounded.BOUNDED)) {
            throw new UnsupportedOperationException("The inputs to the JOIN have un-joinnable windowFns: " + windowFn + ", " + windowFn2);
        }
        if (this.joinType == JoinRelType.FULL) {
            throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join a bounded table with an unbounded table.");
        }
        if ((this.joinType == JoinRelType.LEFT && buildBeamPipeline.isBounded() == PCollection.IsBounded.BOUNDED) || (this.joinType == JoinRelType.RIGHT && buildBeamPipeline2.isBounded() == PCollection.IsBounded.BOUNDED)) {
            throw new UnsupportedOperationException("LEFT side of an OUTER JOIN must be Unbounded table.");
        }
        return sideInputJoin(coder, coder2, buildNullRow, buildNullRow2);
    }

    private PCollection<BeamRecord> standardJoin(PCollection<KV<BeamRecord, BeamRecord>> pCollection, PCollection<KV<BeamRecord, BeamRecord>> pCollection2, BeamRecord beamRecord, BeamRecord beamRecord2, String str) {
        PCollection innerJoin;
        switch (this.joinType) {
            case LEFT:
                innerJoin = org.apache.beam.sdk.extensions.joinlibrary.Join.leftOuterJoin(pCollection, pCollection2, beamRecord2);
                break;
            case RIGHT:
                innerJoin = org.apache.beam.sdk.extensions.joinlibrary.Join.rightOuterJoin(pCollection, pCollection2, beamRecord);
                break;
            case FULL:
                innerJoin = org.apache.beam.sdk.extensions.joinlibrary.Join.fullOuterJoin(pCollection, pCollection2, beamRecord, beamRecord2);
                break;
            case INNER:
            default:
                innerJoin = org.apache.beam.sdk.extensions.joinlibrary.Join.innerJoin(pCollection, pCollection2);
                break;
        }
        return innerJoin.apply(str + "_JoinParts2WholeRow", MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow())).setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
    }

    public PCollection<BeamRecord> sideInputJoin(PCollection<KV<BeamRecord, BeamRecord>> pCollection, PCollection<KV<BeamRecord, BeamRecord>> pCollection2, BeamRecord beamRecord, BeamRecord beamRecord2) {
        boolean z = pCollection.isBounded() == PCollection.IsBounded.BOUNDED;
        return sideInputJoinHelper((!z || this.joinType == JoinRelType.INNER) ? this.joinType : JoinRelType.LEFT, z ? pCollection2 : pCollection, z ? pCollection : pCollection2, z ? beamRecord : beamRecord2, z);
    }

    private PCollection<BeamRecord> sideInputJoinHelper(JoinRelType joinRelType, PCollection<KV<BeamRecord, BeamRecord>> pCollection, PCollection<KV<BeamRecord, BeamRecord>> pCollection2, BeamRecord beamRecord, boolean z) {
        PCollectionView apply = pCollection2.apply(View.asMultimap());
        return pCollection.apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(joinRelType, beamRecord, apply, z)).withSideInputs(new PCollectionView[]{apply})).setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
    }

    private BeamRecord buildNullRow(BeamRelNode beamRelNode) {
        BeamRecordSqlType beamRowType = CalciteUtils.toBeamRowType(beamRelNode.getRowType());
        return new BeamRecord(beamRowType, Collections.nCopies(beamRowType.getFieldCount(), null));
    }

    private List<Pair<Integer, Integer>> extractJoinColumns(int i) {
        if ((this.condition instanceof RexLiteral) && ((Boolean) ((RexLiteral) this.condition).getValue()).booleanValue()) {
            throw new UnsupportedOperationException("CROSS JOIN is not supported!");
        }
        RexCall rexCall = (RexCall) this.condition;
        ArrayList arrayList = new ArrayList();
        if ("AND".equals(rexCall.getOperator().getName())) {
            Iterator<RexNode> it = rexCall.getOperands().iterator();
            while (it.hasNext()) {
                arrayList.add(extractOneJoinColumn((RexCall) it.next(), i));
            }
        } else {
            if (!"=".equals(rexCall.getOperator().getName())) {
                throw new UnsupportedOperationException("Operator " + rexCall.getOperator().getName() + " is not supported in join condition");
            }
            arrayList.add(extractOneJoinColumn(rexCall, i));
        }
        return arrayList;
    }

    private Pair<Integer, Integer> extractOneJoinColumn(RexCall rexCall, int i) {
        List<RexNode> operands = rexCall.getOperands();
        return new Pair<>(Integer.valueOf(Math.min(((RexInputRef) operands.get(0)).getIndex(), ((RexInputRef) operands.get(1)).getIndex())), Integer.valueOf(Math.max(((RexInputRef) operands.get(0)).getIndex(), ((RexInputRef) operands.get(1)).getIndex()) - i));
    }

    private PCollection<BeamRecord> joinAsLookup(BeamRelNode beamRelNode, BeamRelNode beamRelNode2, PCollectionTuple pCollectionTuple, BeamSqlEnv beamSqlEnv) throws Exception {
        return beamRelNode.buildBeamPipeline(pCollectionTuple, beamSqlEnv).apply("join_as_lookup", new BeamJoinTransforms.JoinAsLookup(this.condition, getSeekableTableFromRelNode(beamRelNode2, beamSqlEnv), CalciteUtils.toBeamRowType(beamRelNode2.getRowType()), CalciteUtils.toBeamRowType(beamRelNode.getRowType()).getFieldCount()));
    }

    private BeamSqlSeekableTable getSeekableTableFromRelNode(BeamRelNode beamRelNode, BeamSqlEnv beamSqlEnv) {
        return (BeamSqlSeekableTable) beamSqlEnv.findTable(Joiner.on('.').join(((BeamIOSourceRel) beamRelNode).getTable().getQualifiedName()));
    }

    private boolean seekable(BeamRelNode beamRelNode, BeamSqlEnv beamSqlEnv) {
        return (beamRelNode instanceof BeamIOSourceRel) && (beamSqlEnv.findTable(Joiner.on('.').join(((BeamIOSourceRel) beamRelNode).getTable().getQualifiedName())) instanceof BeamSqlSeekableTable);
    }
}
