package org.apache.beam.sdk.extensions.sql.zetasql;

import com.google.zetasql.SqlException;
import java.time.LocalDate;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
import org.apache.beam.sdk.extensions.sql.impl.ScalarFunctionImpl;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.tools.Frameworks;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.codehaus.commons.compiler.CompileException;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.class */
public class ZetaSqlJavaUdfTest extends ZetaSqlTestBase {

    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private final String jarPathProperty = "beam.sql.udf.test.jar_path";
    private final String emptyJarPathProperty = "beam.sql.udf.test.empty_jar_path";
    private final String jarPath = System.getProperty("beam.sql.udf.test.jar_path");
    private final String emptyJarPath = System.getProperty("beam.sql.udf.test.empty_jar_path");

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest$IncrementFn.class */
    public static class IncrementFn implements BeamSqlUdf {
        public Long eval(Long l) {
            return Long.valueOf(l.longValue() + 1);
        }
    }

    @Before
    public void setUp() {
        if (this.jarPath == null) {
            Assert.fail(String.format("System property %s must be set to run %s.", "beam.sql.udf.test.jar_path", ZetaSqlJavaUdfTest.class.getSimpleName()));
        }
        if (this.emptyJarPath == null) {
            Assert.fail(String.format("System property %s must be set to run %s.", "beam.sql.udf.test.empty_jar_path", ZetaSqlJavaUdfTest.class.getSimpleName()));
        }
        initialize();
    }

    @Test
    public void testNullaryJavaUdf() {
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel(String.format("CREATE FUNCTION helloWorld() RETURNS STRING LANGUAGE java OPTIONS (path='%s'); SELECT helloWorld();", this.jarPath)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addStringField("field1").build()).addValues(new Object[]{"Hello world!"}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUnaryJavaUdf() {
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel(String.format("CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); SELECT increment(1);", this.jarPath)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{2L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testJavaUdfColumnReference() {
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel(String.format("CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); SELECT increment(int64_col) FROM table_all_types;", this.jarPath)));
        Schema build = Schema.builder().addInt64Field("field1").build();
        PAssert.that(pCollection).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{0L}).build(), Row.withSchema(build).addValues(new Object[]{-1L}).build(), Row.withSchema(build).addValues(new Object[]{-2L}).build(), Row.withSchema(build).addValues(new Object[]{-3L}).build(), Row.withSchema(build).addValues(new Object[]{-4L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testNestedJavaUdf() {
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel(String.format("CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); SELECT increment(increment(1));", this.jarPath)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUnexpectedNullArgumentThrowsRuntimeException() {
        BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel(String.format("CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); SELECT increment(NULL);", this.jarPath)));
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectMessage("CalcFn failed to evaluate");
        this.thrown.expectCause(Matchers.allOf(Matchers.isA(RuntimeException.class), Matchers.hasProperty("cause", Matchers.isA(NullPointerException.class))));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testExpectedNullArgument() {
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel(String.format("CREATE FUNCTION isNull(s STRING) RETURNS BOOLEAN LANGUAGE java OPTIONS (path='%s'); SELECT isNull(NULL);", this.jarPath)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addBooleanField("field1").build()).addValues(new Object[]{true}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testSqlTransformRegisterUdf() {
        PAssert.that(this.pipeline.apply(SqlTransform.query("SELECT increment(0);").withQueryPlannerClass(ZetaSQLQueryPlanner.class).registerUdf("increment", IncrementFn.class))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUdfFromCatalog() throws NoSuchMethodException {
        JdbcConnection connect = JdbcDriver.connect(new ReadOnlyTableProvider("empty_table_provider", ImmutableMap.of()), PipelineOptionsFactory.create());
        connect.getCurrentSchemaPlus().add("increment", ScalarFunctionImpl.create(IncrementFn.class.getMethod("eval", Long.class)));
        this.config = Frameworks.newConfigBuilder(this.config).defaultSchema(connect.getCurrentSchemaPlus()).build();
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel("SELECT increment(0);"))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{1L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testNullArgumentIsTypeChecked() {
        BeamRelNode convertToBeamRel = new ZetaSQLQueryPlanner(this.config).convertToBeamRel(String.format("CREATE FUNCTION isNull(i INT64) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); SELECT isNull(NULL);", this.jarPath));
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage("Could not compile CalcFn");
        this.thrown.expectCause(Matchers.allOf(Matchers.isA(CompileException.class), Matchers.hasProperty("message", Matchers.containsString("No applicable constructor/method found for actual parameters \"java.lang.Long\""))));
        BeamSqlRelUtils.toPCollection(this.pipeline, convertToBeamRel);
    }

    @Test
    public void testFunctionSignatureTypeMismatchFailsPipelineConstruction() {
        BeamRelNode convertToBeamRel = new ZetaSQLQueryPlanner(this.config).convertToBeamRel(String.format("CREATE FUNCTION isNull(i INT64) RETURNS BOOLEAN LANGUAGE java OPTIONS (path='%s'); SELECT isNull(0);", this.jarPath));
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage("Could not compile CalcFn");
        this.thrown.expectCause(Matchers.allOf(Matchers.isA(CompileException.class), Matchers.hasProperty("message", Matchers.containsString("No applicable constructor/method found for actual parameters \"long\""))));
        BeamSqlRelUtils.toPCollection(this.pipeline, convertToBeamRel);
    }

    @Test
    public void testJavaUdfWithNoReturnTypeIsRejected() {
        String format = String.format("CREATE FUNCTION helloWorld() LANGUAGE java OPTIONS (path='%s'); SELECT helloWorld();", this.jarPath);
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(SqlException.class);
        this.thrown.expectMessage("Non-SQL functions must specify a return type");
        zetaSQLQueryPlanner.convertToBeamRel(format);
    }

    @Test
    public void testProjectUdfAndBuiltin() {
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel(String.format("CREATE FUNCTION matches(str STRING, regStr STRING) RETURNS BOOLEAN LANGUAGE java OPTIONS (path='%s'); SELECT matches(\"a\", \"a\"), 'apple'='beta'", this.jarPath)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addBooleanField("field1").addBooleanField("field2").build()).addValues(new Object[]{true, false}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testProjectNestedUdfAndBuiltin() {
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel(String.format("CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); SELECT increment(increment(0) + 1);", this.jarPath)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{3L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testJavaUdfEmptyPath() {
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("Failed to define function 'foo'");
        this.thrown.expectCause(Matchers.allOf(Matchers.isA(IllegalArgumentException.class), Matchers.hasProperty("message", Matchers.containsString("No jar was provided to define function foo."))));
        zetaSQLQueryPlanner.convertToBeamRel("CREATE FUNCTION foo() RETURNS STRING LANGUAGE java OPTIONS (path=''); SELECT foo();");
    }

    @Test
    public void testJavaUdfNoJarProvided() {
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("Failed to define function 'foo'");
        this.thrown.expectCause(Matchers.allOf(Matchers.isA(IllegalArgumentException.class), Matchers.hasProperty("message", Matchers.containsString("No jar was provided to define function foo."))));
        zetaSQLQueryPlanner.convertToBeamRel("CREATE FUNCTION foo() RETURNS STRING LANGUAGE java; SELECT foo();");
    }

    @Test
    public void testPathOptionNotString() {
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("Failed to define function 'foo'");
        this.thrown.expectCause(Matchers.allOf(Matchers.isA(IllegalArgumentException.class), Matchers.hasProperty("message", Matchers.containsString("Option 'path' has type TYPE_INT64 (expected TYPE_STRING)."))));
        zetaSQLQueryPlanner.convertToBeamRel("CREATE FUNCTION foo() RETURNS STRING LANGUAGE java OPTIONS (path=23); SELECT foo();");
    }

    @Test
    public void testUdaf() {
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel(String.format("CREATE AGGREGATE FUNCTION my_sum(f INT64) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); SELECT my_sum(f_int_1) from aggregate_test_table", this.jarPath)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{28L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testUdafNotFoundFailsToParse() {
        String format = String.format("CREATE AGGREGATE FUNCTION nonexistent(f INT64) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); SELECT nonexistent(f_int_1) from aggregate_test_table", this.jarPath);
        ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(this.config);
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("Failed to define function 'nonexistent'");
        this.thrown.expectCause(Matchers.allOf(Matchers.isA(IllegalArgumentException.class), Matchers.hasProperty("message", Matchers.containsString("No implementation of aggregate function nonexistent found"))));
        zetaSQLQueryPlanner.convertToBeamRel(format);
    }

    @Test
    public void testRegisterUdaf() {
        PAssert.that(this.pipeline.apply(SqlTransform.query("SELECT my_sum(k) FROM UNNEST([1, 2, 3]) k;").withQueryPlannerClass(ZetaSQLQueryPlanner.class).registerUdaf("my_sum", Sum.ofLongs()))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt64Field("field1").build()).addValues(new Object[]{6L}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }

    @Test
    public void testDateUdf() {
        PAssert.that(BeamSqlRelUtils.toPCollection(this.pipeline, new ZetaSQLQueryPlanner(this.config).convertToBeamRel(String.format("CREATE FUNCTION dateIncrementAll(d DATE) RETURNS DATE LANGUAGE java OPTIONS (path='%s'); SELECT dateIncrementAll('2020-04-04');", this.jarPath)))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addLogicalTypeField("field1", SqlTypes.DATE).build()).addValues(new Object[]{LocalDate.of(2021, 5, 5)}).build()});
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(2L));
    }
}
