/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.neo4j;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.neo4j.Neo4jIO;
import org.apache.beam.sdk.io.neo4j.Neo4jTestUtil;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Record;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionConfig;
import org.testcontainers.containers.Neo4jContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(value=JUnit4.class)
public class Neo4jIOIT {
    private static Neo4jContainer<?> neo4jContainer;
    private static String containerHostname;
    private static int containerPort;
    @Rule
    public transient TestPipeline parameterizedReadPipeline = TestPipeline.create();
    @Rule
    public transient TestPipeline writeUnwindPipeline = TestPipeline.create();
    @Rule
    public transient TestPipeline largeWriteUnwindPipeline = TestPipeline.create();

    @BeforeClass
    public static void setup() throws Exception {
        neo4jContainer = (Neo4jContainer)((Neo4jContainer)((Neo4jContainer)((Neo4jContainer)new Neo4jContainer(DockerImageName.parse((String)"neo4j").withTag("latest")).withStartupAttempts(1)).withAdminPassword("abcd").withEnv("dbms_default_listen_address", "0.0.0.0")).withNetworkAliases(new String[]{"neo4jcontainer"})).withSharedMemorySize(Long.valueOf(0x10000000L));
        neo4jContainer.start();
        containerHostname = neo4jContainer.getContainerIpAddress();
        containerPort = neo4jContainer.getMappedPort(7687);
        Neo4jTestUtil.executeOnNeo4j(containerHostname, containerPort, "CREATE CONSTRAINT something_id_unique FOR (n:Something) REQUIRE n.id IS UNIQUE", true);
    }

    @AfterClass
    public static void tearDown() {
        neo4jContainer.stop();
        neo4jContainer.close();
    }

    @Test
    public void testParameterizedRead() throws Exception {
        PCollection stringsCollections = (PCollection)this.parameterizedReadPipeline.apply((PTransform)Create.of(Arrays.asList("one", "two", "three")));
        Schema outputSchema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"One", (Schema.FieldType)Schema.FieldType.INT32), Schema.Field.of((String)"Str", (Schema.FieldType)Schema.FieldType.STRING)});
        SerializableFunction & Serializable parametersFunction = (SerializableFunction & Serializable)string -> Collections.singletonMap("par1", string);
        Neo4jIO.RowMapper & Serializable rowMapper = (Neo4jIO.RowMapper & Serializable)record -> {
            int one = record.get(0).asInt();
            String string = record.get(1).asString();
            return Row.withSchema((Schema)outputSchema).attachValues(new Object[]{one, string});
        };
        Neo4jIO.ReadAll read = Neo4jIO.readAll().withCypher("RETURN 1, $par1").withDriverConfiguration(Neo4jTestUtil.getDriverConfiguration(containerHostname, containerPort)).withSessionConfig(SessionConfig.forDatabase((String)"neo4j")).withRowMapper((Neo4jIO.RowMapper)rowMapper).withParametersFunction((SerializableFunction)parametersFunction).withCoder((Coder)SerializableCoder.of(Row.class)).withCypherLogging();
        PCollection outputRows = (PCollection)stringsCollections.apply((PTransform)read);
        PCollection outputLines = (PCollection)outputRows.apply((PTransform)ParDo.of((DoFn)new ParameterizedReadRowToLineFn()));
        PAssert.that((PCollection)outputLines).containsInAnyOrder((Object[])new String[]{"1,one", "1,two", "1,three"});
        PipelineResult pipelineResult = this.parameterizedReadPipeline.run();
        Assert.assertEquals((Object)PipelineResult.State.DONE, (Object)pipelineResult.getState());
    }

    @Test
    public void testWriteUnwind() throws Exception {
        PCollection stringsCollections = (PCollection)this.writeUnwindPipeline.apply((PTransform)Create.of(Arrays.asList("one", "two", "three")));
        SerializableFunction & Serializable parametersMapper = (SerializableFunction & Serializable)name -> Collections.singletonMap("name", name);
        Neo4jIO.WriteUnwind read = Neo4jIO.writeUnwind().withDriverConfiguration(Neo4jTestUtil.getDriverConfiguration(containerHostname, containerPort)).withSessionConfig(SessionConfig.forDatabase((String)"neo4j")).withBatchSize(5000L).withUnwindMapName("rows").withCypher("UNWIND $rows AS row MERGE(n:Num { name : row.name })").withParametersFunction((SerializableFunction)parametersMapper).withCypherLogging();
        stringsCollections.apply((PTransform)read);
        PipelineResult pipelineResult = this.writeUnwindPipeline.run();
        Assert.assertEquals((Object)PipelineResult.State.DONE, (Object)pipelineResult.getState());
        try (Driver driver = Neo4jTestUtil.getDriver(containerHostname, containerPort);
             Session session = Neo4jTestUtil.getSession(driver, true);){
            List names = (List)session.readTransaction(tx -> {
                ArrayList<String> list = new ArrayList<String>();
                Result result = tx.run("MATCH(n:Num) RETURN n.name");
                while (result.hasNext()) {
                    Record record = result.next();
                    list.add(record.get(0).asString());
                }
                return list;
            });
            MatcherAssert.assertThat((Object)names, (Matcher)IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])new String[]{"one", "two", "three"}));
        }
    }

    @Test
    public void testLargeWriteUnwind() throws Exception {
        int startId = 5000;
        int endId = 6000;
        ArrayList<Integer> idList = new ArrayList<Integer>();
        for (int id2 = 5000; id2 < 6000; ++id2) {
            idList.add(id2);
        }
        PCollection idCollection = (PCollection)this.largeWriteUnwindPipeline.apply((PTransform)Create.of(idList));
        SerializableFunction & Serializable parametersFunction = (SerializableFunction & Serializable)id -> ImmutableMap.of((Object)"id", (Object)id, (Object)"name", (Object)"Casters", (Object)"firstName", (Object)"Matt");
        Neo4jIO.WriteUnwind read = Neo4jIO.writeUnwind().withDriverConfiguration(Neo4jTestUtil.getDriverConfiguration(containerHostname, containerPort)).withSessionConfig(SessionConfig.forDatabase((String)"neo4j")).withBatchSize(123L).withUnwindMapName("rows").withCypher("UNWIND $rows AS row CREATE(n:Something { id : row.id })").withParametersFunction((SerializableFunction)parametersFunction).withCypherLogging();
        idCollection.apply((PTransform)read);
        PipelineResult pipelineResult = this.largeWriteUnwindPipeline.run();
        Assert.assertEquals((Object)PipelineResult.State.DONE, (Object)pipelineResult.getState());
        try (Driver driver = Neo4jTestUtil.getDriver(containerHostname, containerPort);
             Session session = Neo4jTestUtil.getSession(driver, true);){
            List values = (List)session.readTransaction(tx -> {
                List<Integer> v = null;
                int nrRows = 0;
                Result result = tx.run("MATCH(n:Something) RETURN count(n), min(n.id), max(n.id)");
                while (result.hasNext()) {
                    Record record = result.next();
                    v = Arrays.asList(record.get(0).asInt(), record.get(1).asInt(), record.get(2).asInt(), ++nrRows);
                }
                return v;
            });
            Assert.assertNotNull((Object)values);
            MatcherAssert.assertThat((Object)values, (Matcher)IsIterableContainingInOrder.contains((Object[])new Integer[]{1000, 5000, 5999, 1}));
        }
    }

    private static class ParameterizedReadRowToLineFn
    extends DoFn<Row, String> {
        private ParameterizedReadRowToLineFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            Row row = (Row)context.element();
            assert (row != null);
            int one = row.getInt32(0);
            String string = row.getString(1);
            context.output((Object)(one + "," + string));
        }
    }
}

