package org.apache.samza.sql.translator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.sql.SqlExplainFormat;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.commons.lang3.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
import org.apache.samza.sql.translator.JoinInputNode;
import org.apache.samza.table.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/samza/sql/translator/JoinTranslator.class */
public class JoinTranslator {
    private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
    private String logicalOpId;
    private final String intermediateStreamPrefix;
    private final int queryId;
    private final TranslatorInputMetricsMapFunction inputMetricsMF;
    private final TranslatorOutputMetricsMapFunction outputMetricsMF;

    /* JADX INFO: Access modifiers changed from: package-private */
    public JoinTranslator(String str, String str2, int i) {
        this.logicalOpId = str;
        this.intermediateStreamPrefix = str2 + (str2.isEmpty() ? SamzaSqlApplicationConfig.DEFAULT_METADATA_TOPIC_PREFIX : "_");
        this.queryId = i;
        this.inputMetricsMF = new TranslatorInputMetricsMapFunction(str);
        this.outputMetricsMF = new TranslatorOutputMetricsMapFunction(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void translate(LogicalJoin logicalJoin, TranslatorContext translatorContext) {
        JoinInputNode.InputType inputType = JoinInputNode.getInputType(logicalJoin.getLeft(), translatorContext.getExecutionContext().getSamzaSqlApplicationConfig().getInputSystemStreamConfigBySource());
        JoinInputNode.InputType inputType2 = JoinInputNode.getInputType(logicalJoin.getRight(), translatorContext.getExecutionContext().getSamzaSqlApplicationConfig().getInputSystemStreamConfigBySource());
        validateJoinQuery(logicalJoin, inputType, inputType2);
        boolean z = inputType2 != JoinInputNode.InputType.STREAM;
        final LinkedList linkedList = new LinkedList();
        final LinkedList linkedList2 = new LinkedList();
        int fieldCount = logicalJoin.getLeft().getRowType().getFieldCount();
        final int i = z ? fieldCount : 0;
        final int i2 = z ? 0 : fieldCount;
        final int fieldCount2 = z ? logicalJoin.getRowType().getFieldCount() : fieldCount;
        logicalJoin.getCondition().accept(new RexShuttle() { // from class: org.apache.samza.sql.translator.JoinTranslator.1
            /* renamed from: visitInputRef, reason: merged with bridge method [inline-methods] */
            public RexNode m28visitInputRef(RexInputRef rexInputRef) {
                JoinTranslator.this.validateJoinKeyType(rexInputRef);
                int index = rexInputRef.getIndex();
                if (index < i || index >= fieldCount2) {
                    linkedList.add(Integer.valueOf(index - i2));
                } else {
                    linkedList2.add(Integer.valueOf(index - i));
                }
                return rexInputRef;
            }
        });
        Collections.sort(linkedList2);
        Collections.sort(linkedList);
        JoinInputNode joinInputNode = new JoinInputNode(z ? logicalJoin.getLeft() : logicalJoin.getRight(), linkedList, z ? inputType : inputType2, !z);
        JoinInputNode joinInputNode2 = new JoinInputNode(z ? logicalJoin.getRight() : logicalJoin.getLeft(), linkedList2, z ? inputType2 : inputType, z);
        MessageStream<SamzaSqlRelMessage> joinStreamWithTable = joinStreamWithTable(translatorContext.getMessageStream(joinInputNode.getRelNode().getId()), getTable(joinInputNode2, translatorContext), joinInputNode, joinInputNode2, logicalJoin, translatorContext);
        translatorContext.registerMessageStream(logicalJoin.getId(), joinStreamWithTable);
        joinStreamWithTable.map(this.outputMetricsMF);
    }

    private MessageStream<SamzaSqlRelMessage> joinStreamWithTable(MessageStream<SamzaSqlRelMessage> messageStream, Table table, JoinInputNode joinInputNode, JoinInputNode joinInputNode2, LogicalJoin logicalJoin, TranslatorContext translatorContext) {
        List<Integer> keyIds = joinInputNode.getKeyIds();
        List<Integer> keyIds2 = joinInputNode2.getKeyIds();
        Validate.isTrue(keyIds.size() == keyIds2.size());
        log.info("Joining on the following Stream and Table field(s): ");
        ArrayList arrayList = new ArrayList(joinInputNode.getFieldNames());
        ArrayList arrayList2 = new ArrayList(joinInputNode2.getFieldNames());
        for (int i = 0; i < keyIds.size(); i++) {
            log.info(((String) arrayList.get(keyIds.get(i).intValue())) + " with " + ((String) arrayList2.get(keyIds2.get(i).intValue())));
        }
        if (!joinInputNode2.isRemoteTable()) {
            return messageStream.map(this.inputMetricsMF).partitionBy(samzaSqlRelMessage -> {
                return SamzaSqlRelMessage.createSamzaSqlCompositeKey(samzaSqlRelMessage, keyIds, SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames(arrayList2, keyIds2));
            }, samzaSqlRelMessage2 -> {
                return samzaSqlRelMessage2;
            }, KVSerde.of((SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null), (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null)), this.intermediateStreamPrefix + "stream_" + this.logicalOpId).map((v0) -> {
                return v0.getValue();
            }).join(table, new SamzaSqlLocalTableJoinFunction(joinInputNode, joinInputNode2, logicalJoin.getJoinType()), new Object[0]);
        }
        String sourceName = joinInputNode2.getSourceName();
        return messageStream.map(this.inputMetricsMF).join(table, new SamzaSqlRemoteTableJoinFunction(translatorContext.getMsgConverter(sourceName), translatorContext.getTableKeyConverter(sourceName), joinInputNode, joinInputNode2, logicalJoin.getJoinType(), this.queryId), new Object[0]);
    }

    private void validateJoinQuery(LogicalJoin logicalJoin, JoinInputNode.InputType inputType, JoinInputNode.InputType inputType2) {
        JoinRelType joinType = logicalJoin.getJoinType();
        if (joinType.compareTo(JoinRelType.INNER) != 0 && joinType.compareTo(JoinRelType.LEFT) != 0 && joinType.compareTo(JoinRelType.RIGHT) != 0) {
            throw new SamzaException("Query with only INNER and LEFT/RIGHT OUTER join are supported.");
        }
        boolean z = inputType != JoinInputNode.InputType.STREAM;
        boolean z2 = inputType2 != JoinInputNode.InputType.STREAM;
        if (!z && !z2) {
            throw new SamzaException("Invalid query with both sides of join being denoted as 'stream'. Stream-stream join is not yet supported. " + dumpRelPlanForNode(logicalJoin));
        }
        if (z && z2) {
            throw new SamzaException("Invalid query with both sides of join being denoted as 'table'. " + dumpRelPlanForNode(logicalJoin));
        }
        if (joinType.compareTo(JoinRelType.LEFT) == 0 && z) {
            throw new SamzaException("Invalid query for outer left join. Left side of the join should be a 'stream' and right side of join should be a 'table'. " + dumpRelPlanForNode(logicalJoin));
        }
        if (joinType.compareTo(JoinRelType.RIGHT) == 0 && z2) {
            throw new SamzaException("Invalid query for outer right join. Left side of the join should be a 'table' and right side of join should be a 'stream'. " + dumpRelPlanForNode(logicalJoin));
        }
        ArrayList arrayList = new ArrayList();
        decomposeAndValidateConjunction(logicalJoin.getCondition(), arrayList);
        if (arrayList.isEmpty()) {
            throw new SamzaException("Query results in a cross join, which is not supported. Please optimize the query. It is expected that the joins should include JOIN ON operator in the sql query.");
        }
        arrayList.forEach(rexNode -> {
        });
        if ((z2 ? inputType2 : inputType).compareTo(JoinInputNode.InputType.REMOTE_TABLE) != 0) {
            return;
        }
        final ArrayList arrayList2 = new ArrayList();
        logicalJoin.getCondition().accept(new RexShuttle() { // from class: org.apache.samza.sql.translator.JoinTranslator.3
            /* renamed from: visitInputRef, reason: merged with bridge method [inline-methods] */
            public RexNode m30visitInputRef(RexInputRef rexInputRef) {
                arrayList2.add(rexInputRef);
                return rexInputRef;
            }
        });
        int fieldCount = z2 ? logicalJoin.getLeft().getRowType().getFieldCount() : 0;
        int fieldCount2 = z2 ? logicalJoin.getRowType().getFieldCount() : logicalJoin.getLeft().getRowType().getFieldCount();
        List list = (List) arrayList2.stream().map(rexInputRef -> {
            return Integer.valueOf(rexInputRef.getIndex());
        }).filter(num -> {
            return fieldCount <= num.intValue() && num.intValue() < fieldCount2;
        }).map(num2 -> {
            return Integer.valueOf(num2.intValue() - fieldCount);
        }).sorted().collect(Collectors.toList());
        if (arrayList.size() != 1 || list.size() != 1) {
            throw new SamzaException("Invalid query for join condition must contain exactly one predicate for remote table on __key__ column " + dumpRelPlanForNode(logicalJoin));
        }
        if (!isValidRemoteJoinRef(((Integer) list.get(0)).intValue(), z2 ? logicalJoin.getRight() : logicalJoin.getLeft())) {
            throw new SamzaException("Invalid query for join condition can not have an expression and must be reference __key__ column " + dumpRelPlanForNode(logicalJoin));
        }
    }

    private static boolean isValidRemoteJoinRef(int i, RelNode relNode) {
        if (relNode instanceof TableScan) {
            return ((RelDataTypeField) relNode.getRowType().getFieldList().get(i)).getName().equals(SamzaSqlRelMessage.KEY_NAME);
        }
        Preconditions.checkState(relNode.getInputs().size() == 1, "Has to be single input RelNode and got " + relNode.getDigest());
        if (relNode instanceof LogicalFilter) {
            return isValidRemoteJoinRef(i, relNode.getInput(0));
        }
        RexInputRef rexInputRef = (RexNode) ((LogicalProject) relNode).getProjects().get(i);
        if (rexInputRef instanceof RexCall) {
            return false;
        }
        return isValidRemoteJoinRef(rexInputRef.getIndex(), relNode.getInput(0));
    }

    public static void decomposeAndValidateConjunction(RexNode rexNode, List<RexNode> list) {
        if (rexNode == null || rexNode.isAlwaysTrue()) {
            return;
        }
        if (rexNode.isA(SqlKind.AND)) {
            Iterator it = ((RexCall) rexNode).getOperands().iterator();
            while (it.hasNext()) {
                decomposeAndValidateConjunction((RexNode) it.next(), list);
            }
        } else {
            if (!rexNode.isA(SqlKind.EQUALS)) {
                throw new SamzaException("Only equi-joins and AND operator is supported in join condition.");
            }
            list.add(rexNode);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void validateJoinKeyType(RexInputRef rexInputRef) {
        SqlTypeName sqlTypeName = rexInputRef.getType().getSqlTypeName();
        if (sqlTypeName == SqlTypeName.BOOLEAN || sqlTypeName == SqlTypeName.TINYINT || sqlTypeName == SqlTypeName.SMALLINT || sqlTypeName == SqlTypeName.INTEGER || sqlTypeName == SqlTypeName.CHAR || sqlTypeName == SqlTypeName.BIGINT || sqlTypeName == SqlTypeName.VARCHAR || sqlTypeName == SqlTypeName.DOUBLE || sqlTypeName == SqlTypeName.FLOAT || sqlTypeName == SqlTypeName.ANY || sqlTypeName == SqlTypeName.OTHER) {
            return;
        }
        log.error("Unsupported key type " + sqlTypeName + " used in join condition.");
        throw new SamzaException("Unsupported key type used in join condition.");
    }

    private String dumpRelPlanForNode(RelNode relNode) {
        return RelOptUtil.dumpPlan("Rel expression: ", relNode, SqlExplainFormat.TEXT, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static SqlIOConfig resolveSQlIOForTable(RelNode relNode, Map<String, SqlIOConfig> map) {
        if (relNode instanceof HepRelVertex) {
            return resolveSQlIOForTable(((HepRelVertex) relNode).getCurrentRel(), map);
        }
        if (relNode instanceof LogicalProject) {
            return resolveSQlIOForTable(((LogicalProject) relNode).getInput(), map);
        }
        if (relNode instanceof LogicalFilter) {
            return resolveSQlIOForTable(((LogicalFilter) relNode).getInput(), map);
        }
        if ((relNode instanceof LogicalJoin) && relNode.getInputs().size() > 1) {
            return null;
        }
        if (!(relNode instanceof TableScan)) {
            throw new SamzaException(String.format("Unsupported query. relNode %s is not of type TableScan.", relNode.toString()));
        }
        String sourceFromSourceParts = SqlIOConfig.getSourceFromSourceParts(relNode.getTable().getQualifiedName());
        SqlIOConfig sqlIOConfig = map.get(sourceFromSourceParts);
        if (sqlIOConfig == null) {
            throw new SamzaException("Unsupported source found in join statement: " + sourceFromSourceParts);
        }
        return sqlIOConfig;
    }

    private Table getTable(JoinInputNode joinInputNode, TranslatorContext translatorContext) {
        SqlIOConfig resolveSQlIOForTable = resolveSQlIOForTable(joinInputNode.getRelNode(), translatorContext.getExecutionContext().getSamzaSqlApplicationConfig().getInputSystemStreamConfigBySource());
        if (resolveSQlIOForTable == null || !resolveSQlIOForTable.getTableDescriptor().isPresent()) {
            String str = "Failed to resolve table source in join operation: node=" + joinInputNode.getRelNode();
            log.error(str);
            throw new SamzaException(str);
        }
        Table table = translatorContext.getStreamAppDescriptor().getTable(resolveSQlIOForTable.getTableDescriptor().get());
        if (joinInputNode.isRemoteTable()) {
            return table;
        }
        MessageStream messageStream = translatorContext.getMessageStream(joinInputNode.getRelNode().getId());
        SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde samzaSqlRelRecordSerde = (SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde) new SamzaSqlRelRecordSerdeFactory().getSerde(null, null);
        SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde samzaSqlRelMessageSerde = (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
        List<Integer> keyIds = joinInputNode.getKeyIds();
        messageStream.partitionBy(samzaSqlRelMessage -> {
            return SamzaSqlRelMessage.createSamzaSqlCompositeKey(samzaSqlRelMessage, keyIds);
        }, samzaSqlRelMessage2 -> {
            return samzaSqlRelMessage2;
        }, KVSerde.of(samzaSqlRelRecordSerde, samzaSqlRelMessageSerde), this.intermediateStreamPrefix + "table_" + this.logicalOpId).sendTo(table);
        return table;
    }

    @VisibleForTesting
    public TranslatorInputMetricsMapFunction getInputMetricsMF() {
        return this.inputMetricsMF;
    }

    @VisibleForTesting
    public TranslatorOutputMetricsMapFunction getOutputMetricsMF() {
        return this.outputMetricsMF;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1236102726:
                if (implMethodName.equals("lambda$getTable$193c3537$1")) {
                    z = 2;
                    break;
                }
                break;
            case -828101576:
                if (implMethodName.equals("lambda$joinStreamWithTable$d560c6f0$1")) {
                    z = 4;
                    break;
                }
                break;
            case 196138697:
                if (implMethodName.equals("lambda$joinStreamWithTable$6028c617$1")) {
                    z = 3;
                    break;
                }
                break;
            case 769108134:
                if (implMethodName.equals("lambda$getTable$da92bfd4$1")) {
                    z = true;
                    break;
                }
                break;
            case 1967798203:
                if (implMethodName.equals("getValue")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/samza/operators/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/samza/operators/KV") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/samza/operators/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/samza/sql/translator/JoinTranslator") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;Lorg/apache/samza/sql/data/SamzaSqlRelMessage;)Lorg/apache/samza/sql/SamzaSqlRelRecord;")) {
                    List list = (List) serializedLambda.getCapturedArg(0);
                    return samzaSqlRelMessage -> {
                        return SamzaSqlRelMessage.createSamzaSqlCompositeKey(samzaSqlRelMessage, list);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/samza/operators/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/samza/sql/translator/JoinTranslator") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/samza/sql/data/SamzaSqlRelMessage;)Lorg/apache/samza/sql/data/SamzaSqlRelMessage;")) {
                    return samzaSqlRelMessage2 -> {
                        return samzaSqlRelMessage2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/samza/operators/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/samza/sql/translator/JoinTranslator") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/samza/sql/data/SamzaSqlRelMessage;)Lorg/apache/samza/sql/data/SamzaSqlRelMessage;")) {
                    return samzaSqlRelMessage22 -> {
                        return samzaSqlRelMessage22;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/samza/operators/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/samza/sql/translator/JoinTranslator") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/samza/sql/data/SamzaSqlRelMessage;)Lorg/apache/samza/sql/SamzaSqlRelRecord;")) {
                    List list2 = (List) serializedLambda.getCapturedArg(0);
                    List list3 = (List) serializedLambda.getCapturedArg(1);
                    List list4 = (List) serializedLambda.getCapturedArg(2);
                    return samzaSqlRelMessage3 -> {
                        return SamzaSqlRelMessage.createSamzaSqlCompositeKey(samzaSqlRelMessage3, list2, SamzaSqlRelMessage.getSamzaSqlCompositeKeyFieldNames(list3, list4));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
