/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.text;

import java.io.File;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.NumberedShardedFile;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TextTableProviderTest {
    @Rule
    public TestPipeline pipeline = TestPipeline.create();
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder(){

        protected void after() {
        }
    };
    private static final String SQL_CSV_SCHEMA = "(f_string VARCHAR, f_int INT)";
    private static final Schema CSV_SCHEMA = Schema.builder().addNullableField("f_string", Schema.FieldType.STRING).addNullableField("f_int", Schema.FieldType.INT32).build();
    private static final Schema LINES_SCHEMA = Schema.builder().addStringField("f_string").build();
    private static final String SQL_LINES_SCHEMA = "(f_string VARCHAR)";
    private static final Schema SINGLE_STRING_CSV_SCHEMA = Schema.builder().addStringField("f_string").build();
    private static final String SINGLE_STRING_SQL_SCHEMA = "(f_string VARCHAR)";

    @Test
    public void testLegacyDefaultCsv() throws Exception {
        Files.write(this.tempFolder.newFile("test.csv").toPath(), "hello,13\n\ngoodbye,42\n".getBytes(Charsets.UTF_8), new OpenOption[0]);
        BeamSqlEnv env = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new TextTableProvider()});
        env.executeDdl(String.format("CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*'", SQL_CSV_SCHEMA, this.tempFolder.getRoot()));
        PCollection rows = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)env.parseQuery("SELECT * FROM test"));
        PAssert.that((PCollection)rows).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)CSV_SCHEMA).addValues(new Object[]{"hello", 13}).build(), Row.withSchema((Schema)CSV_SCHEMA).addValues(new Object[]{"goodbye", 42}).build()});
        this.pipeline.run();
    }

    @Test
    public void testLegacyTdfCsv() throws Exception {
        Files.write(this.tempFolder.newFile("test.csv").toPath(), "hello\t13\n\ngoodbye\t42\n".getBytes(Charsets.UTF_8), new OpenOption[0]);
        BeamSqlEnv env = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new TextTableProvider()});
        env.executeDdl(String.format("CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*' TBLPROPERTIES '{\"format\":\"TDF\"}'", SQL_CSV_SCHEMA, this.tempFolder.getRoot()));
        PCollection rows = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)env.parseQuery("SELECT * FROM test"));
        rows.apply((PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.voids()).via((SerializableFunction & Serializable)r -> {
            System.out.println(r.toString());
            return null;
        }));
        PAssert.that((PCollection)rows).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)CSV_SCHEMA).addValues(new Object[]{"hello", 13}).build(), Row.withSchema((Schema)CSV_SCHEMA).addValues(new Object[]{"goodbye", 42}).build()});
        this.pipeline.run();
    }

    @Test
    public void testExplicitCsv() throws Exception {
        Files.write(this.tempFolder.newFile("test.csv").toPath(), "hello,13\n\ngoodbye,42\n".getBytes(Charsets.UTF_8), new OpenOption[0]);
        BeamSqlEnv env = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new TextTableProvider()});
        env.executeDdl(String.format("CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*' TBLPROPERTIES '{\"format\":\"csv\"}'", SQL_CSV_SCHEMA, this.tempFolder.getRoot()));
        PCollection rows = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)env.parseQuery("SELECT * FROM test"));
        PAssert.that((PCollection)rows).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)CSV_SCHEMA).addValues(new Object[]{"hello", 13}).build(), Row.withSchema((Schema)CSV_SCHEMA).addValues(new Object[]{"goodbye", 42}).build()});
        this.pipeline.run();
    }

    @Test
    public void testExplicitCsvExcel() throws Exception {
        Files.write(this.tempFolder.newFile("test.csv").toPath(), "hello\n\ngoodbye\n".getBytes(Charsets.UTF_8), new OpenOption[0]);
        BeamSqlEnv env = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new TextTableProvider()});
        env.executeDdl(String.format("CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*' TBLPROPERTIES '{\"format\":\"csv\", \"csvFormat\":\"Excel\"}'", "(f_string VARCHAR)", this.tempFolder.getRoot()));
        PCollection rows = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)env.parseQuery("SELECT * FROM test"));
        PAssert.that((PCollection)rows).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)SINGLE_STRING_CSV_SCHEMA).addValues(new Object[]{"hello"}).build(), Row.withSchema((Schema)SINGLE_STRING_CSV_SCHEMA).addValues(new Object[]{"goodbye"}).build()});
        this.pipeline.run();
    }

    @Test
    public void testLines() throws Exception {
        Files.write(this.tempFolder.newFile("test.csv").toPath(), "hello,13\ngoodbye,42\n".getBytes(Charsets.UTF_8), new OpenOption[0]);
        BeamSqlEnv env = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new TextTableProvider()});
        env.executeDdl(String.format("CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*' TBLPROPERTIES '{\"format\":\"lines\"}'", "(f_string VARCHAR)", this.tempFolder.getRoot()));
        PCollection rows = BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)env.parseQuery("SELECT * FROM test"));
        PAssert.that((PCollection)rows).containsInAnyOrder((Object[])new Row[]{Row.withSchema((Schema)LINES_SCHEMA).addValues(new Object[]{"hello,13"}).build(), Row.withSchema((Schema)LINES_SCHEMA).addValues(new Object[]{"goodbye,42"}).build()});
        this.pipeline.run();
    }

    @Test
    public void testWriteLines() throws Exception {
        File destinationFile = new File(this.tempFolder.getRoot(), "lines-outputs");
        BeamSqlEnv env = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new TextTableProvider()});
        env.executeDdl(String.format("CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"lines\"}'", "(f_string VARCHAR)", destinationFile.getAbsolutePath()));
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)env.parseQuery("INSERT INTO test VALUES ('hello'), ('goodbye')"));
        this.pipeline.run();
        Assert.assertThat((Object)new NumberedShardedFile(destinationFile.getAbsolutePath() + "*").readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF), (Matcher)Matchers.containsInAnyOrder((Object[])new String[]{"hello", "goodbye"}));
    }

    @Test
    public void testWriteCsv() throws Exception {
        File destinationFile = new File(this.tempFolder.getRoot(), "csv-outputs");
        BeamSqlEnv env = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new TextTableProvider()});
        env.executeDdl(String.format("CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\"}'", SQL_CSV_SCHEMA, destinationFile.getAbsolutePath()));
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)env.parseQuery("INSERT INTO test VALUES ('hello', 42), ('goodbye', 13)"));
        this.pipeline.run();
        Assert.assertThat((Object)new NumberedShardedFile(destinationFile.getAbsolutePath() + "*").readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF), (Matcher)Matchers.containsInAnyOrder((Object[])new String[]{"hello,42", "goodbye,13"}));
    }
}

