/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb;

import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import de.flapdoodle.embed.mongo.MongodExecutable;
import de.flapdoodle.embed.mongo.MongodProcess;
import de.flapdoodle.embed.mongo.MongodStarter;
import de.flapdoodle.embed.mongo.config.IMongodConfig;
import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
import de.flapdoodle.embed.mongo.config.Net;
import de.flapdoodle.embed.mongo.config.Storage;
import de.flapdoodle.embed.mongo.distribution.IFeatureAwareVersion;
import de.flapdoodle.embed.mongo.distribution.Version;
import de.flapdoodle.embed.process.config.IExecutableProcessConfig;
import de.flapdoodle.embed.process.runtime.Network;
import java.io.Serializable;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamPushDownIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.mongodb.MongoDbTableProvider;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SerializableMatchers;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class MongoDbReadWriteIT {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDbReadWriteIT.class);
    private static final Schema SOURCE_SCHEMA = Schema.builder().addNullableField("c_bigint", Schema.FieldType.INT64).addNullableField("c_tinyint", Schema.FieldType.BYTE).addNullableField("c_smallint", Schema.FieldType.INT16).addNullableField("c_integer", Schema.FieldType.INT32).addNullableField("c_float", Schema.FieldType.FLOAT).addNullableField("c_double", Schema.FieldType.DOUBLE).addNullableField("c_boolean", Schema.FieldType.BOOLEAN).addNullableField("c_varchar", Schema.FieldType.STRING).addNullableField("c_arr", Schema.FieldType.array((Schema.FieldType)Schema.FieldType.STRING)).build();
    private static final String hostname = "localhost";
    private static final String database = "beam";
    private static final String collection = "collection";
    @ClassRule
    public static final TemporaryFolder MONGODB_LOCATION = new TemporaryFolder();
    private static final MongodStarter mongodStarter = MongodStarter.getDefaultInstance();
    private static MongodExecutable mongodExecutable;
    private static MongodProcess mongodProcess;
    private static MongoClient client;
    private static BeamSqlEnv sqlEnv;
    private static String mongoSqlUrl;
    @Rule
    public final TestPipeline writePipeline = TestPipeline.create();
    @Rule
    public final TestPipeline readPipeline = TestPipeline.create();

    @BeforeClass
    public static void setUp() throws Exception {
        int port = NetworkTestHelper.getAvailableLocalPort();
        LOG.info("Starting MongoDB embedded instance on {}", (Object)port);
        IMongodConfig mongodConfig = new MongodConfigBuilder().version((IFeatureAwareVersion)Version.Main.PRODUCTION).configServer(false).replication(new Storage(MONGODB_LOCATION.getRoot().getPath(), null, 0)).net(new Net(hostname, port, Network.localhostIsIPv6())).cmdOptions(new MongoCmdOptionsBuilder().syncDelay(10).useNoPrealloc(true).useSmallFiles(true).useNoJournal(true).verbose(false).build()).build();
        mongodExecutable = (MongodExecutable)mongodStarter.prepare((IExecutableProcessConfig)mongodConfig);
        mongodProcess = (MongodProcess)mongodExecutable.start();
        client = new MongoClient(hostname, port);
        mongoSqlUrl = String.format("mongodb://%s:%d/%s/%s", hostname, port, database, collection);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        client.dropDatabase(database);
        client.close();
        mongodProcess.stop();
        mongodExecutable.stop();
    }

    @Before
    public void init() {
        sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new MongoDbTableProvider()});
        MongoDatabase db = client.getDatabase(database);
        db.runCommand((Bson)new BasicDBObject().append("profile", (Object)2));
    }

    @After
    public void cleanUp() {
        client.getDatabase(database).drop();
    }

    @Test
    public void testWriteAndRead() {
        Row testRow = this.row(SOURCE_SCHEMA, Long.MAX_VALUE, (byte)127, (short)Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), 1.0, true, "varchar", Arrays.asList("123", "456"));
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n    c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_varchar VARCHAR, \n    c_arr ARRAY<VARCHAR> \n) \nTYPE 'mongodb' \nLOCATION '" + mongoSqlUrl + "'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, 'varchar', ARRAY['123', '456'])";
        BeamRelNode insertRelNode = sqlEnv.parseQuery(insertStatement);
        BeamSqlRelUtils.toPCollection((Pipeline)this.writePipeline, (BeamRelNode)insertRelNode);
        this.writePipeline.run().waitUntilFinish();
        PCollection output = BeamSqlRelUtils.toPCollection((Pipeline)this.readPipeline, (BeamRelNode)sqlEnv.parseQuery("select * from TEST"));
        MatcherAssert.assertThat((Object)output.getSchema(), (Matcher)SerializableMatchers.equalTo((Serializable)SOURCE_SCHEMA));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Row[]{testRow});
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testProjectPushDown() {
        Schema expectedSchema = Schema.builder().addNullableField("c_varchar", Schema.FieldType.STRING).addNullableField("c_boolean", Schema.FieldType.BOOLEAN).addNullableField("c_integer", Schema.FieldType.INT32).build();
        Row testRow = this.row(expectedSchema, "varchar", true, Integer.MAX_VALUE);
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n    c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_varchar VARCHAR, \n    c_arr ARRAY<VARCHAR> \n) \nTYPE 'mongodb' \nLOCATION '" + mongoSqlUrl + "'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, 'varchar', ARRAY['123', '456'])";
        BeamRelNode insertRelNode = sqlEnv.parseQuery(insertStatement);
        BeamSqlRelUtils.toPCollection((Pipeline)this.writePipeline, (BeamRelNode)insertRelNode);
        this.writePipeline.run().waitUntilFinish();
        BeamRelNode node = sqlEnv.parseQuery("select c_varchar, c_boolean, c_integer from TEST");
        MatcherAssert.assertThat((Object)node, (Matcher)IsInstanceOf.instanceOf(BeamPushDownIOSourceRel.class));
        MatcherAssert.assertThat((Object)node.getRowType().getFieldNames(), (Matcher)SerializableMatchers.containsInAnyOrder((Serializable[])new String[]{"c_varchar", "c_boolean", "c_integer"}));
        PCollection output = BeamSqlRelUtils.toPCollection((Pipeline)this.readPipeline, (BeamRelNode)node);
        MatcherAssert.assertThat((Object)output.getSchema(), (Matcher)SerializableMatchers.equalTo((Serializable)expectedSchema));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Row[]{testRow});
        this.readPipeline.run().waitUntilFinish();
        MongoDatabase db = client.getDatabase(database);
        MongoCollection coll = db.getCollection("system.profile");
        Object query = coll.find().filter(Filters.eq((String)"op", (Object)"query")).sort((Bson)new BasicDBObject().append("ts", (Object)-1)).iterator().next();
        MatcherAssert.assertThat((Object)query, (Matcher)IsInstanceOf.instanceOf(Document.class));
        Object command = ((Document)query).get((Object)"command");
        MatcherAssert.assertThat((Object)command, (Matcher)IsInstanceOf.instanceOf(Document.class));
        Object projection = ((Document)command).get((Object)"projection");
        MatcherAssert.assertThat((Object)projection, (Matcher)IsInstanceOf.instanceOf(Document.class));
        MatcherAssert.assertThat((Object)((Document)projection).keySet(), (Matcher)SerializableMatchers.containsInAnyOrder((Serializable[])new String[]{"c_varchar", "c_boolean", "c_integer"}));
    }

    @Test
    public void testPredicatePushDown() {
        Document expectedFilter = new Document().append("$or", (Object)ImmutableList.of((Object)new Document("c_varchar", (Object)"varchar"), (Object)new Document("c_varchar", (Object)new Document("$not", (Object)new Document("$eq", (Object)"fakeString")))).asList()).append("c_boolean", (Object)true).append("c_integer", (Object)Integer.MAX_VALUE);
        Schema expectedSchema = Schema.builder().addNullableField("c_varchar", Schema.FieldType.STRING).addNullableField("c_boolean", Schema.FieldType.BOOLEAN).addNullableField("c_integer", Schema.FieldType.INT32).build();
        Row testRow = this.row(expectedSchema, "varchar", true, Integer.MAX_VALUE);
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   c_bigint BIGINT, \n    c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_varchar VARCHAR, \n    c_arr ARRAY<VARCHAR> \n) \nTYPE 'mongodb' \nLOCATION '" + mongoSqlUrl + "'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO TEST VALUES (9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, 'varchar', ARRAY['123', '456'])";
        BeamRelNode insertRelNode = sqlEnv.parseQuery(insertStatement);
        BeamSqlRelUtils.toPCollection((Pipeline)this.writePipeline, (BeamRelNode)insertRelNode);
        this.writePipeline.run().waitUntilFinish();
        BeamRelNode node = sqlEnv.parseQuery("select c_varchar, c_boolean, c_integer from TEST where (c_varchar='varchar' or c_varchar<>'fakeString') and c_boolean and c_integer=2147483647");
        MatcherAssert.assertThat((Object)node, (Matcher)IsInstanceOf.instanceOf(BeamPushDownIOSourceRel.class));
        MatcherAssert.assertThat((Object)node.getRowType().getFieldNames(), (Matcher)SerializableMatchers.containsInAnyOrder((Serializable[])new String[]{"c_varchar", "c_boolean", "c_integer"}));
        PCollection output = BeamSqlRelUtils.toPCollection((Pipeline)this.readPipeline, (BeamRelNode)node);
        MatcherAssert.assertThat((Object)output.getSchema(), (Matcher)SerializableMatchers.equalTo((Serializable)expectedSchema));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Row[]{testRow});
        this.readPipeline.run().waitUntilFinish();
        MongoDatabase db = client.getDatabase(database);
        MongoCollection coll = db.getCollection("system.profile");
        Object query = coll.find().filter(Filters.eq((String)"op", (Object)"query")).sort((Bson)new BasicDBObject().append("ts", (Object)-1)).iterator().next();
        MatcherAssert.assertThat((Object)query, (Matcher)IsInstanceOf.instanceOf(Document.class));
        Object command = ((Document)query).get((Object)"command");
        MatcherAssert.assertThat((Object)command, (Matcher)IsInstanceOf.instanceOf(Document.class));
        Object filter = ((Document)command).get((Object)"filter");
        MatcherAssert.assertThat((Object)filter, (Matcher)IsInstanceOf.instanceOf(Document.class));
        Object projection = ((Document)command).get((Object)"projection");
        MatcherAssert.assertThat((Object)projection, (Matcher)IsInstanceOf.instanceOf(Document.class));
        MatcherAssert.assertThat((Object)((Document)projection).keySet(), (Matcher)SerializableMatchers.containsInAnyOrder((Serializable[])new String[]{"c_varchar", "c_boolean", "c_integer"}));
        MatcherAssert.assertThat((Object)((Document)filter), (Matcher)SerializableMatchers.equalTo((Serializable)expectedFilter));
    }

    private Row row(Schema schema, Object ... values) {
        return Row.withSchema((Schema)schema).addValues(values).build();
    }
}

