package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb;

import com.mongodb.client.model.Filters;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.IntFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImplConstants;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.InvalidTableException;
import org.apache.beam.sdk.io.mongodb.FindQuery;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.FieldTypeDescriptors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.SelectHelpers;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.ToJson;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexCall;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;
import org.checkerframework.dataflow.qual.SideEffectFree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.class */
public class MongoDbTable extends SchemaBaseBeamTable implements Serializable {
    private static final Logger LOG;

    @VisibleForTesting
    final Pattern locationPattern;

    @VisibleForTesting
    final String dbCollection;

    @VisibleForTesting
    final String dbName;

    @VisibleForTesting
    final String dbUri;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbTable$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$beam$vendor$calcite$v1_28_0$org$apache$calcite$sql$SqlKind = new int[SqlKind.values().length];

        static {
            try {
                $SwitchMap$org$apache$beam$vendor$calcite$v1_28_0$org$apache$calcite$sql$SqlKind[SqlKind.IN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$beam$vendor$calcite$v1_28_0$org$apache$calcite$sql$SqlKind[SqlKind.EQUALS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$beam$vendor$calcite$v1_28_0$org$apache$calcite$sql$SqlKind[SqlKind.NOT_EQUALS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$beam$vendor$calcite$v1_28_0$org$apache$calcite$sql$SqlKind[SqlKind.LESS_THAN.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$beam$vendor$calcite$v1_28_0$org$apache$calcite$sql$SqlKind[SqlKind.GREATER_THAN.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$beam$vendor$calcite$v1_28_0$org$apache$calcite$sql$SqlKind[SqlKind.GREATER_THAN_OR_EQUAL.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$beam$vendor$calcite$v1_28_0$org$apache$calcite$sql$SqlKind[SqlKind.LESS_THAN_OR_EQUAL.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$beam$vendor$calcite$v1_28_0$org$apache$calcite$sql$SqlKind[SqlKind.AND.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$beam$vendor$calcite$v1_28_0$org$apache$calcite$sql$SqlKind[SqlKind.OR.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable$DocumentToRow.class */
    public static class DocumentToRow extends PTransform<PCollection<Document>, PCollection<Row>> {
        private final Schema schema;

        /* JADX INFO: Access modifiers changed from: package-private */
        @VisibleForTesting
        /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable$DocumentToRow$DocumentToJsonStringConverter.class */
        public static class DocumentToJsonStringConverter extends DoFn<Document, String> {
            DocumentToJsonStringConverter() {
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<Document, String>.ProcessContext processContext) {
                processContext.output(((Document) processContext.element()).toJson(JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build()));
            }
        }

        private DocumentToRow(Schema schema) {
            this.schema = schema;
        }

        public static DocumentToRow withSchema(Schema schema) {
            return new DocumentToRow(schema);
        }

        public PCollection<Row> expand(PCollection<Document> pCollection) {
            return pCollection.apply("Convert Document to JSON", ParDo.of(new DocumentToJsonStringConverter())).apply("Transform JSON to Row", JsonToRow.withSchema(this.schema)).setRowSchema(this.schema);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable$MongoDbFilter.class */
    static class MongoDbFilter implements BeamSqlTableFilter {
        private List<RexNode> supported;
        private List<RexNode> unsupported;

        public MongoDbFilter(List<RexNode> list, List<RexNode> list2) {
            this.supported = list;
            this.unsupported = list2;
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter
        public List<RexNode> getNotSupported() {
            return this.unsupported;
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter
        public int numSupported() {
            return BeamSqlTableFilter.expressionsInFilter(this.supported);
        }

        public List<RexNode> getSupported() {
            return this.supported;
        }

        @SideEffectFree
        public String toString() {
            return "[" + ("supported{" + ((String) this.supported.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining())) + "}") + ", " + ("unsupported{" + ((String) this.unsupported.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining())) + "}") + "]";
        }

        public static MongoDbFilter create(List<RexNode> list) {
            ImmutableList.Builder builder = ImmutableList.builder();
            ImmutableList.Builder builder2 = ImmutableList.builder();
            for (RexNode rexNode : list) {
                if (!rexNode.getType().getSqlTypeName().equals(SqlTypeName.BOOLEAN)) {
                    throw new RuntimeException("Predicate node '" + rexNode.getClass().getSimpleName() + "' should be a boolean expression, but was: " + rexNode.getType().getSqlTypeName());
                }
                if (isSupported(rexNode)) {
                    builder.add(rexNode);
                } else {
                    builder2.add(rexNode);
                }
            }
            return new MongoDbFilter(builder.build(), builder2.build());
        }

        private static boolean isSupported(RexNode rexNode) {
            if (!(rexNode instanceof RexCall)) {
                if (rexNode instanceof RexInputRef) {
                    return true;
                }
                throw new RuntimeException("Encountered an unexpected node type: " + rexNode.getClass().getSimpleName());
            }
            RexCall rexCall = (RexCall) rexNode;
            if (!rexNode.getKind().belongsTo(SqlKind.COMPARISON) && !rexNode.getKind().equals(SqlKind.NOT)) {
                if (!rexNode.getKind().equals(SqlKind.AND) && !rexNode.getKind().equals(SqlKind.OR)) {
                    return false;
                }
                Iterator it = rexCall.getOperands().iterator();
                while (it.hasNext()) {
                    if (!isSupported((RexNode) it.next())) {
                        return false;
                    }
                }
                return true;
            }
            int i = 0;
            for (RexNode rexNode2 : rexCall.getOperands()) {
                if (rexNode2 instanceof RexInputRef) {
                    i++;
                } else if (!(rexNode2 instanceof RexLiteral)) {
                    return false;
                }
            }
            return i == 1;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable$RowToDocument.class */
    public static class RowToDocument extends PTransform<PCollection<Row>, PCollection<Document>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @VisibleForTesting
        /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable$RowToDocument$ObjectToDocumentFn.class */
        public static class ObjectToDocumentFn extends SimpleFunction<String, Document> {
            ObjectToDocumentFn() {
            }

            public Document apply(String str) {
                return Document.parse(str);
            }
        }

        private RowToDocument() {
        }

        public static RowToDocument convert() {
            return new RowToDocument();
        }

        public PCollection<Document> expand(PCollection<Row> pCollection) {
            return pCollection.apply("Transform Rows to JSON", ToJson.of()).apply("Produce documents from JSON", MapElements.via(new ObjectToDocumentFn()));
        }

        /* synthetic */ RowToDocument(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MongoDbTable(Table table) {
        super(table.getSchema());
        this.locationPattern = Pattern.compile("(?<credsHostPort>mongodb://(?<usernamePassword>.*(?<password>:.*)?@)?.+:\\d+)/(?<database>.+)/(?<collection>.+)");
        String location = table.getLocation();
        Matcher matcher = this.locationPattern.matcher(location);
        if (!matcher.matches()) {
            throw new InvalidTableException("MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection' but was: " + location);
        }
        this.dbUri = matcher.group("credsHostPort");
        this.dbName = matcher.group("database");
        this.dbCollection = matcher.group("collection");
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable
    public PCollection<Row> buildIOReader(PBegin pBegin) {
        return MongoDbIO.read().withUri(this.dbUri).withDatabase(this.dbName).withCollection(this.dbCollection).expand(pBegin).apply(DocumentToRow.withSchema(getSchema()));
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable, org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable
    public PCollection<Row> buildIOReader(PBegin pBegin, BeamSqlTableFilter beamSqlTableFilter, List<String> list) {
        MongoDbIO.Read withCollection = MongoDbIO.read().withUri(this.dbUri).withDatabase(this.dbName).withCollection(this.dbCollection);
        Schema outputSchema = SelectHelpers.getOutputSchema(getSchema(), FieldAccessDescriptor.withFieldNames(list).resolve(getSchema()));
        FindQuery create = FindQuery.create();
        if (!(beamSqlTableFilter instanceof DefaultTableFilter)) {
            MongoDbFilter mongoDbFilter = (MongoDbFilter) beamSqlTableFilter;
            if (!mongoDbFilter.getSupported().isEmpty()) {
                Bson constructPredicate = constructPredicate(mongoDbFilter.getSupported());
                LOG.info("Pushing down the following filter: {}", constructPredicate);
                create = create.withFilters(constructPredicate);
            }
        }
        if (!list.isEmpty()) {
            create = create.withProjection(list);
        }
        return withCollection.withQueryFn(create).expand(pBegin).apply(DocumentToRow.withSchema(outputSchema));
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable
    /* renamed from: buildIOWriter */
    public POutput mo174buildIOWriter(PCollection<Row> pCollection) {
        return pCollection.apply(new RowToDocument(null)).apply(MongoDbIO.write().withUri(this.dbUri).withDatabase(this.dbName).withCollection(this.dbCollection));
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable, org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable
    public ProjectSupport supportsProjects() {
        return ProjectSupport.WITH_FIELD_REORDERING;
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable, org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable
    public BeamSqlTableFilter constructFilter(List<RexNode> list) {
        return MongoDbFilter.create(list);
    }

    private Bson constructPredicate(List<RexNode> list) {
        if (!$assertionsDisabled && list.isEmpty()) {
            throw new AssertionError();
        }
        List list2 = (List) list.stream().map(this::translateRexNodeToBson).collect(Collectors.toList());
        return list2.size() == 1 ? (Bson) list2.get(0) : Filters.and(list2);
    }

    private Bson translateRexNodeToBson(RexNode rexNode) {
        IntFunction intFunction = i -> {
            return getSchema().getField(i).getName();
        };
        if (!(rexNode instanceof RexCall)) {
            if ((rexNode instanceof RexInputRef) && rexNode.getType().getSqlTypeName().equals(SqlTypeName.BOOLEAN)) {
                return Filters.eq((String) intFunction.apply(((RexInputRef) rexNode).getIndex()), true);
            }
            throw new RuntimeException("Was expecting a RexCall or a boolean RexInputRef, but received: " + rexNode.getClass().getSimpleName());
        }
        RexCall rexCall = (RexCall) rexNode;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (RexInputRef rexInputRef : rexCall.getOperands()) {
            if (rexInputRef instanceof RexLiteral) {
                arrayList.add((RexLiteral) rexInputRef);
            } else if (rexInputRef instanceof RexInputRef) {
                arrayList2.add(rexInputRef);
            }
        }
        if (arrayList2.size() != 1) {
            switch (AnonymousClass1.$SwitchMap$org$apache$beam$vendor$calcite$v1_28_0$org$apache$calcite$sql$SqlKind[rexNode.getKind().ordinal()]) {
                case BeamSqlParserImplConstants.ADMIN /* 8 */:
                    return Filters.and((Iterable) rexCall.getOperands().stream().map(this::translateRexNodeToBson).collect(Collectors.toList()));
                case BeamSqlParserImplConstants.AFTER /* 9 */:
                    return Filters.or((Iterable) rexCall.getOperands().stream().map(this::translateRexNodeToBson).collect(Collectors.toList()));
            }
        }
        RexInputRef rexInputRef2 = (RexInputRef) arrayList2.get(0);
        String str = (String) intFunction.apply(rexInputRef2.getIndex());
        if (arrayList.size() <= 0) {
            if (rexNode.getKind().equals(SqlKind.NOT)) {
                return Filters.not(translateRexNodeToBson(rexInputRef2));
            }
            throw new RuntimeException("Cannot create a filter for an unsupported node: " + rexNode.toString());
        }
        Object convertToExpectedType = convertToExpectedType(rexInputRef2, arrayList.get(0));
        switch (AnonymousClass1.$SwitchMap$org$apache$beam$vendor$calcite$v1_28_0$org$apache$calcite$sql$SqlKind[rexNode.getKind().ordinal()]) {
            case 1:
                return Filters.in(str, new Object[]{convertToExpectedType(rexInputRef2, arrayList)});
            case 2:
                return Filters.eq(str, convertToExpectedType);
            case 3:
                return Filters.not(Filters.eq(str, convertToExpectedType));
            case 4:
                return Filters.lt(str, convertToExpectedType);
            case 5:
                return Filters.gt(str, convertToExpectedType);
            case 6:
                return Filters.gte(str, convertToExpectedType);
            case BeamSqlParserImplConstants.ADD /* 7 */:
                return Filters.lte(str, convertToExpectedType);
        }
        throw new RuntimeException("Encountered an unexpected node kind: " + rexNode.getKind().toString());
    }

    private Object convertToExpectedType(RexInputRef rexInputRef, RexLiteral rexLiteral) {
        return rexLiteral.getValueAs(FieldTypeDescriptors.javaTypeForFieldType(getSchema().getField(rexInputRef.getIndex()).getType()).getRawType());
    }

    private Object convertToExpectedType(RexInputRef rexInputRef, List<RexLiteral> list) {
        return list.stream().map(rexLiteral -> {
            return convertToExpectedType(rexInputRef, rexLiteral);
        }).collect(Collectors.toList());
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable
    public PCollection.IsBounded isBounded() {
        return PCollection.IsBounded.BOUNDED;
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable, org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable
    public BeamTableStatistics getTableStatistics(PipelineOptions pipelineOptions) {
        long documentCount = MongoDbIO.read().withUri(this.dbUri).withDatabase(this.dbName).withCollection(this.dbCollection).getDocumentCount();
        return documentCount < 0 ? BeamTableStatistics.BOUNDED_UNKNOWN : BeamTableStatistics.createBoundedTableStatistics(Double.valueOf(documentCount));
    }

    static {
        $assertionsDisabled = !MongoDbTable.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(MongoDbTable.class);
    }
}
