package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb;

import java.io.Serializable;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.ToJson;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.bson.Document;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;

@Experimental
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable.class */
public class MongoDbTable extends SchemaBaseBeamTable implements Serializable {

    @VisibleForTesting
    final Pattern locationPattern;

    @VisibleForTesting
    final String dbCollection;

    @VisibleForTesting
    final String dbName;

    @VisibleForTesting
    final String dbUri;

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable$DocumentToRow.class */
    public static class DocumentToRow extends PTransform<PCollection<Document>, PCollection<Row>> {
        private final Schema schema;

        /* JADX INFO: Access modifiers changed from: package-private */
        @VisibleForTesting
        /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable$DocumentToRow$DocumentToJsonStringConverter.class */
        public static class DocumentToJsonStringConverter extends DoFn<Document, String> {
            DocumentToJsonStringConverter() {
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<Document, String>.ProcessContext processContext) {
                processContext.output(((Document) processContext.element()).toJson(JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build()));
            }
        }

        private DocumentToRow(Schema schema) {
            this.schema = schema;
        }

        public static DocumentToRow withSchema(Schema schema) {
            return new DocumentToRow(schema);
        }

        public PCollection<Row> expand(PCollection<Document> pCollection) {
            return pCollection.apply("Convert Document to JSON", ParDo.of(new DocumentToJsonStringConverter())).apply("Transform JSON to Row", JsonToRow.withSchema(this.schema)).setRowSchema(this.schema);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable$RowToDocument.class */
    public static class RowToDocument extends PTransform<PCollection<Row>, PCollection<Document>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @VisibleForTesting
        /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbTable$RowToDocument$ObjectToDocumentFn.class */
        public static class ObjectToDocumentFn extends SimpleFunction<String, Document> {
            ObjectToDocumentFn() {
            }

            public Document apply(String str) {
                return Document.parse(str);
            }
        }

        private RowToDocument() {
        }

        public static RowToDocument convert() {
            return new RowToDocument();
        }

        public PCollection<Document> expand(PCollection<Row> pCollection) {
            return pCollection.apply("Transform Rows to JSON", ToJson.of()).apply("Produce documents from JSON", MapElements.via(new ObjectToDocumentFn()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MongoDbTable(Table table) {
        super(table.getSchema());
        this.locationPattern = Pattern.compile("(?<credsHostPort>mongodb://(?<usernamePassword>.*(?<password>:.*)?@)?.+:\\d+)/(?<database>.+)/(?<collection>.+)");
        Matcher matcher = this.locationPattern.matcher(table.getLocation());
        Preconditions.checkArgument(matcher.matches(), "MongoDb location must be in the following format: 'mongodb://(username:password@)?localhost:27017/database/collection'");
        this.dbUri = matcher.group("credsHostPort");
        this.dbName = matcher.group("database");
        this.dbCollection = matcher.group("collection");
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable
    public PCollection<Row> buildIOReader(PBegin pBegin) {
        return MongoDbIO.read().withUri(this.dbUri).withDatabase(this.dbName).withCollection(this.dbCollection).expand(pBegin).apply(DocumentToRow.withSchema(getSchema()));
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable
    /* renamed from: buildIOWriter */
    public POutput mo88buildIOWriter(PCollection<Row> pCollection) {
        return pCollection.apply(new RowToDocument()).apply(MongoDbIO.write().withUri(this.dbUri).withDatabase(this.dbName).withCollection(this.dbCollection));
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable
    public PCollection.IsBounded isBounded() {
        return PCollection.IsBounded.BOUNDED;
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable
    public BeamTableStatistics getTableStatistics(PipelineOptions pipelineOptions) {
        long documentCount = MongoDbIO.read().withUri(this.dbUri).withDatabase(this.dbName).withCollection(this.dbCollection).getDocumentCount();
        return documentCount < 0 ? BeamTableStatistics.BOUNDED_UNKNOWN : BeamTableStatistics.createBoundedTableStatistics(Double.valueOf(documentCount));
    }
}
