package org.apache.beam.sdk.extensions.sql.meta.provider.mongodb;

import com.mongodb.MongoClient;
import de.flapdoodle.embed.mongo.MongodExecutable;
import de.flapdoodle.embed.mongo.MongodProcess;
import de.flapdoodle.embed.mongo.MongodStarter;
import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
import de.flapdoodle.embed.mongo.config.Net;
import de.flapdoodle.embed.mongo.config.Storage;
import de.flapdoodle.embed.mongo.distribution.Version;
import de.flapdoodle.embed.process.runtime.Network;
import java.util.Arrays;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.class */
public class MongoDbReadWriteIT {
    private static final String hostname = "localhost";
    private static final String database = "beam";
    private static final String collection = "collection";
    private static int port;
    private static MongodExecutable mongodExecutable;
    private static MongodProcess mongodProcess;
    private static MongoClient client;

    @Rule
    public final TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public final TestPipeline readPipeline = TestPipeline.create();
    private static final Logger LOG = LoggerFactory.getLogger(MongoDbReadWriteIT.class);
    private static final Schema SOURCE_SCHEMA = Schema.builder().addNullableField("_id", Schema.FieldType.STRING).addNullableField("c_bigint", Schema.FieldType.INT64).addNullableField("c_tinyint", Schema.FieldType.BYTE).addNullableField("c_smallint", Schema.FieldType.INT16).addNullableField("c_integer", Schema.FieldType.INT32).addNullableField("c_float", Schema.FieldType.FLOAT).addNullableField("c_double", Schema.FieldType.DOUBLE).addNullableField("c_boolean", Schema.FieldType.BOOLEAN).addNullableField("c_varchar", Schema.FieldType.STRING).addNullableField("c_arr", Schema.FieldType.array(Schema.FieldType.STRING)).build();

    @ClassRule
    public static final TemporaryFolder MONGODB_LOCATION = new TemporaryFolder();
    private static final MongodStarter mongodStarter = MongodStarter.getDefaultInstance();

    @BeforeClass
    public static void setUp() throws Exception {
        port = NetworkTestHelper.getAvailableLocalPort();
        LOG.info("Starting MongoDB embedded instance on {}", Integer.valueOf(port));
        mongodExecutable = mongodStarter.prepare(new MongodConfigBuilder().version(Version.Main.PRODUCTION).configServer(false).replication(new Storage(MONGODB_LOCATION.getRoot().getPath(), (String) null, 0)).net(new Net(hostname, port, Network.localhostIsIPv6())).cmdOptions(new MongoCmdOptionsBuilder().syncDelay(10).useNoPrealloc(true).useSmallFiles(true).useNoJournal(true).verbose(false).build()).build());
        mongodProcess = mongodExecutable.start();
        client = new MongoClient(hostname, port);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        client.dropDatabase(database);
        client.close();
        mongodProcess.stop();
        mongodExecutable.stop();
    }

    @Test
    public void testWriteAndRead() {
        String format = String.format("mongodb://%s:%d/%s/%s", hostname, Integer.valueOf(port), database, collection);
        Row row = row(SOURCE_SCHEMA, "object_id", Long.MAX_VALUE, Byte.MAX_VALUE, Short.MAX_VALUE, Integer.MAX_VALUE, Float.valueOf(1.0f), Double.valueOf(1.0d), true, "varchar", Arrays.asList("123", "456"));
        String str = "CREATE EXTERNAL TABLE TEST( \n   _id VARCHAR, \n    c_bigint BIGINT, \n    c_tinyint TINYINT, \n   c_smallint SMALLINT, \n   c_integer INTEGER, \n   c_float FLOAT, \n   c_double DOUBLE, \n   c_boolean BOOLEAN, \n   c_varchar VARCHAR, \n    c_arr ARRAY<VARCHAR> \n) \nTYPE 'mongodb' \nLOCATION '" + format + "'";
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new MongoDbTableProvider()});
        inMemory.executeDdl(str);
        BeamSqlRelUtils.toPCollection(this.writePipeline, inMemory.parseQuery("INSERT INTO TEST VALUES ('object_id', 9223372036854775807, 127, 32767, 2147483647, 1.0, 1.0, TRUE, 'varchar', ARRAY['123', '456'])"));
        this.writePipeline.run().waitUntilFinish();
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.readPipeline, inMemory.parseQuery("select * from TEST"));
        Assert.assertEquals(pCollection.getSchema(), SOURCE_SCHEMA);
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{row});
        this.readPipeline.run().waitUntilFinish();
    }

    private Row row(Schema schema, Object... objArr) {
        return Row.withSchema(schema).addValues(objArr).build();
    }
}
