package org.apache.beam.sdk.extensions.sql;

import com.google.auto.service.AutoService;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable;
import org.apache.beam.sdk.extensions.sql.impl.ParseException;
import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImplConstants;
import org.apache.beam.sdk.extensions.sql.meta.provider.UdfUdafProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.linq4j.function.Parameter;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.schema.TranslatableTable;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.class */
public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$CubicInteger.class */
    public static class CubicInteger implements BeamSqlUdf {
        public static Integer eval(Integer num) {
            return Integer.valueOf(num.intValue() * num.intValue() * num.intValue());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$CubicIntegerFn.class */
    public static class CubicIntegerFn implements SerializableFunction<Integer, Integer> {
        public Integer apply(Integer num) {
            return Integer.valueOf(num.intValue() * num.intValue() * num.intValue());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$JodaMax.class */
    public static class JodaMax extends Combine.CombineFn<Instant, Instant, Instant> {
        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Instant m6createAccumulator() {
            return new Instant(0L);
        }

        public Instant addInput(Instant instant, Instant instant2) {
            return instant.isBefore(instant2) ? instant2 : instant;
        }

        public Instant mergeAccumulators(Iterable<Instant> iterable) {
            ReadableInstant instant = new Instant(0L);
            Iterator<Instant> it = iterable.iterator();
            while (it.hasNext()) {
                ReadableInstant readableInstant = (Instant) it.next();
                instant = readableInstant.isBefore(instant) ? instant : readableInstant;
            }
            return instant;
        }

        public Instant extractOutput(Instant instant) {
            return instant;
        }

        /* renamed from: mergeAccumulators, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m5mergeAccumulators(Iterable iterable) {
            return mergeAccumulators((Iterable<Instant>) iterable);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$PreviousDate.class */
    public static final class PreviousDate implements BeamSqlUdf {
        public static Date eval(Date date) {
            return new Date(date.getTime() - 86400000);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$PreviousDay.class */
    public static final class PreviousDay implements BeamSqlUdf {
        public static Timestamp eval(Timestamp timestamp) {
            return new Timestamp(timestamp.getTime() - 86400000);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$PreviousHour.class */
    public static final class PreviousHour implements BeamSqlUdf {
        public static Time eval(Time time) {
            return new Time(time.getTime() - 3600000);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$RangeUdf.class */
    public static final class RangeUdf implements BeamSqlUdf {
        public static TranslatableTable eval(int i, int i2) {
            return BeamCalciteTable.of(new TestBoundedTable(Schema.of(new Schema.Field[]{Schema.Field.of("f0", Schema.FieldType.INT32)})).addRows(IntStream.range(i, i2).boxed().toArray()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$RawCombineFn.class */
    public static class RawCombineFn extends Combine.CombineFn {
        public Object createAccumulator() {
            return null;
        }

        public Object addInput(Object obj, Object obj2) {
            return null;
        }

        public Object mergeAccumulators(Iterable iterable) {
            return null;
        }

        public Object extractOutput(Object obj) {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$SquareAndAccumulateInList.class */
    public static class SquareAndAccumulateInList extends Combine.CombineFn<Integer, List<Integer>, List<Integer>> {
        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public List<Integer> m8createAccumulator() {
            return new ArrayList();
        }

        public List<Integer> addInput(List<Integer> list, Integer num) {
            list.add(Integer.valueOf(num.intValue() * num.intValue()));
            return list;
        }

        public List<Integer> mergeAccumulators(Iterable<List<Integer>> iterable) {
            List<Integer> m8createAccumulator = m8createAccumulator();
            Iterator<List<Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                m8createAccumulator.addAll(it.next());
            }
            return m8createAccumulator;
        }

        public List<Integer> extractOutput(List<Integer> list) {
            Collections.sort(list);
            return list;
        }

        /* renamed from: mergeAccumulators, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m7mergeAccumulators(Iterable iterable) {
            return mergeAccumulators((Iterable<List<Integer>>) iterable);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$SquareAndAccumulateInMap.class */
    public static class SquareAndAccumulateInMap extends Combine.CombineFn<Integer, Map<String, Integer>, Map<String, Integer>> {
        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Map<String, Integer> m10createAccumulator() {
            return new HashMap();
        }

        public Map<String, Integer> addInput(Map<String, Integer> map, Integer num) {
            map.put("squareOf-" + num, Integer.valueOf(num.intValue() * num.intValue()));
            return map;
        }

        public Map<String, Integer> mergeAccumulators(Iterable<Map<String, Integer>> iterable) {
            Map<String, Integer> m10createAccumulator = m10createAccumulator();
            Iterator<Map<String, Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                m10createAccumulator.putAll(it.next());
            }
            return m10createAccumulator;
        }

        public Map<String, Integer> extractOutput(Map<String, Integer> map) {
            return map;
        }

        /* renamed from: mergeAccumulators, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m9mergeAccumulators(Iterable iterable) {
            return mergeAccumulators((Iterable<Map<String, Integer>>) iterable);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$SquareSquareSum.class */
    public static class SquareSquareSum extends SquareSum {
        @Override // org.apache.beam.sdk.extensions.sql.BeamSqlDslUdfUdafTest.SquareSum
        public Integer addInput(Integer num, Integer num2) {
            return super.addInput(num, Integer.valueOf(num2.intValue() * num2.intValue()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$SquareSum.class */
    public static class SquareSum extends Combine.CombineFn<Integer, Integer, Integer> {
        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Integer m12createAccumulator() {
            return 0;
        }

        @Override // 
        public Integer addInput(Integer num, Integer num2) {
            return Integer.valueOf(num.intValue() + (num2.intValue() * num2.intValue()));
        }

        public Integer mergeAccumulators(Iterable<Integer> iterable) {
            int i = 0;
            Iterator<Integer> it = iterable.iterator();
            while (it.hasNext()) {
                i += it.next().intValue();
            }
            return Integer.valueOf(i);
        }

        public Integer extractOutput(Integer num) {
            return num;
        }

        /* renamed from: mergeAccumulators, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m11mergeAccumulators(Iterable iterable) {
            return mergeAccumulators((Iterable<Integer>) iterable);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$TestListLength.class */
    public static final class TestListLength implements BeamSqlUdf {
        public static Integer eval(List<Long> list) {
            return Integer.valueOf(list.size());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$TestReturnTypeList.class */
    public static final class TestReturnTypeList implements BeamSqlUdf {
        public static List<Long> eval(Long l) {
            return Arrays.asList(l);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$UdfFnWithDefault.class */
    public static final class UdfFnWithDefault implements BeamSqlUdf {
        public static String eval(@Parameter(name = "s") String str, @Parameter(name = "n", optional = true) Integer num) {
            return str.substring(0, num == null ? 1 : num.intValue());
        }
    }

    @AutoService({UdfUdafProvider.class})
    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest$UdfUdafProviderTest.class */
    public static class UdfUdafProviderTest implements UdfUdafProvider {
        public Map<String, Class<? extends BeamSqlUdf>> getBeamSqlUdfs() {
            return ImmutableMap.of("autoload_cubic", CubicInteger.class);
        }

        public Map<String, Combine.CombineFn> getUdafs() {
            return ImmutableMap.of("autoload_squaresum", new SquareSum());
        }
    }

    @Test
    public void testUdaf() throws Exception {
        PAssert.that(this.boundedInput1.apply("testUdaf", SqlTransform.query("SELECT f_int2, squaresum(f_int) AS `squaresum` FROM PCOLLECTION GROUP BY f_int2").registerUdaf("squaresum", new SquareSum()))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt32Field("f_int2").addInt32Field("squaresum").build()).addValues(new Object[]{0, 30}).build()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testTimestampUdaf() throws Exception {
        PAssert.that(this.boundedInput1.apply("testJodaUdaf", SqlTransform.query("SELECT MAX_JODA(f_timestamp) as jodatime FROM PCOLLECTION").registerUdaf("MAX_JODA", new JodaMax()))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addDateTimeField("jodatime").build()).addValues(new Object[]{DateTimeUtils.parseTimestampWithoutTimeZone("2017-01-01 02:04:03")}).build()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testDateUdf() throws Exception {
        PAssert.that(this.boundedInput1.apply("testTimeUdf", SqlTransform.query("SELECT PRE_DATE(f_date) as result_date FROM PCOLLECTION WHERE f_int=1").registerUdf("PRE_DATE", PreviousDate.class))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addField("result_date", Schema.FieldType.logicalType(SqlTypes.DATE)).build()).addValues(new Object[]{LocalDate.of(2016, 12, 31)}).build()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testTimeUdf() throws Exception {
        PAssert.that(this.boundedInput1.apply("testTimeUdf", SqlTransform.query("SELECT PRE_HOUR(f_time) as result_time FROM PCOLLECTION WHERE f_int=1").registerUdf("PRE_HOUR", PreviousHour.class))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addField("result_time", Schema.FieldType.logicalType(SqlTypes.TIME)).build()).addValues(new Object[]{LocalTime.of(0, 1, 3)}).build()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testTimestampUdf() throws Exception {
        PAssert.that(this.boundedInput1.apply("testTimeUdf", SqlTransform.query("SELECT PRE_DAY(f_timestamp) as result_time FROM PCOLLECTION WHERE f_int=1").registerUdf("PRE_DAY", PreviousDay.class))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addDateTimeField("result_time").build()).addValues(new Object[]{DateTimeUtils.parseTimestampWithoutTimeZone("2016-12-31 01:01:03")}).build()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testUdafWithMapOutput() throws Exception {
        Schema build = Schema.builder().addInt32Field("f_int2").addMapField("squareAndAccumulateInMap", Schema.FieldType.STRING, Schema.FieldType.INT32).build();
        HashMap hashMap = new HashMap();
        hashMap.put("squareOf-1", 1);
        hashMap.put("squareOf-2", 4);
        hashMap.put("squareOf-3", 9);
        hashMap.put("squareOf-4", 16);
        PAssert.that(this.boundedInput1.apply("testUdafWithMapOutput", SqlTransform.query("SELECT f_int2,squareAndAccumulateInMap(f_int) AS `squareAndAccumulateInMap` FROM PCOLLECTION GROUP BY f_int2").registerUdaf("squareAndAccumulateInMap", new SquareAndAccumulateInMap()))).containsInAnyOrder(new Row[]{Row.withSchema(build).addValues(new Object[]{0, hashMap}).build()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testUdafWithListOutput() throws Exception {
        PAssert.that(this.boundedInput1.apply("testUdafWithListOutput", SqlTransform.query("SELECT f_int2,squareAndAccumulateInList(f_int) AS `squareAndAccumulateInList` FROM PCOLLECTION GROUP BY f_int2").registerUdaf("squareAndAccumulateInList", new SquareAndAccumulateInList()))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt32Field("f_int2").addArrayField("squareAndAccumulateInList", Schema.FieldType.INT32).build()).addValue(0).addArray(Arrays.asList(1, 4, 9, 16)).build()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testUdfWithListOutput() throws Exception {
        PAssert.that(this.boundedInput1.apply("testArrayUdf", SqlTransform.query("SELECT test_array(1)").registerUdf("test_array", TestReturnTypeList.class))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addArrayField("array_field", Schema.FieldType.INT64).build()).addValue(Arrays.asList(1L)).build()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testUdfWithListInput() throws Exception {
        PAssert.that(this.boundedInput1.apply("testArrayUdf", SqlTransform.query("select array_length(ARRAY[1, 2, 3])").registerUdf("array_length", TestListLength.class))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt32Field("int_field").build()).addValue(3).build()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testUdafMultiLevelDescendent() {
        PAssert.that(this.boundedInput1.apply("testUdaf", SqlTransform.query("SELECT f_int2, double_square_sum(f_int) AS `squaresum` FROM PCOLLECTION GROUP BY f_int2").registerUdaf("double_square_sum", new SquareSquareSum()))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt32Field("f_int2").addInt32Field("squaresum").build()).addValues(new Object[]{0, Integer.valueOf(BeamSqlParserImplConstants.NOT)}).build()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testRawCombineFnSubclass() {
        this.exceptions.expect(ParseException.class);
        this.exceptions.expectCause(ThrowableMessageMatcher.hasMessage(Matchers.containsString("CombineFn must be parameterized")));
        this.pipeline.enableAbandonedNodeEnforcement(false);
        this.boundedInput1.apply("testUdaf", SqlTransform.query("SELECT f_int2, squaresum(f_int) AS `squaresum` FROM PCOLLECTION GROUP BY f_int2").registerUdaf("squaresum", new RawCombineFn()));
    }

    @Test
    public void testBeamSqlUdf() throws Exception {
        PAssert.that(this.boundedInput1.apply("testUdf", SqlTransform.query("SELECT f_int, cubic(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2").registerUdf("cubic", CubicInteger.class))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt32Field("f_int").addInt32Field("cubicvalue").build()).addValues(new Object[]{2, 8}).build()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testSerializableFunctionUdf() throws Exception {
        PAssert.that(PCollectionTuple.of(new TupleTag("PCOLLECTION"), this.boundedInput1).apply("testUdf", SqlTransform.query("SELECT f_int, cubic(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2").registerUdf("cubic", new CubicIntegerFn()))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt32Field("f_int").addInt32Field("cubicvalue").build()).addValues(new Object[]{2, 8}).build()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testBeamSqlUdfWithDefaultParameters() throws Exception {
        PAssert.that(PCollectionTuple.of(new TupleTag("PCOLLECTION"), this.boundedInput1).apply("testUdf", SqlTransform.query("SELECT f_int, substr(f_string) as sub_string FROM PCOLLECTION WHERE f_int = 2").registerUdf("substr", UdfFnWithDefault.class))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt32Field("f_int").addStringField("sub_string").build()).addValues(new Object[]{2, "s"}).build()});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testTableMacroUdf() throws Exception {
        Schema of = Schema.of(new Schema.Field[]{Schema.Field.of("f0", Schema.FieldType.INT32)});
        PAssert.that(this.pipeline.apply(SqlTransform.query("SELECT * FROM table(range_udf(0, 3))").registerUdf("range_udf", RangeUdf.class))).containsInAnyOrder(new Row[]{Row.withSchema(of).addValue(0).build(), Row.withSchema(of).addValue(1).build(), Row.withSchema(of).addValue(2).build()});
        this.pipeline.run();
    }

    @Test
    public void testAutoLoadedUdfUdaf() throws Exception {
        PAssert.that(this.boundedInput1.apply("testUdaf", SqlTransform.query("SELECT f_int2, autoload_squaresum(autoload_cubic(f_int)) AS `autoload_squarecubicsum` FROM PCOLLECTION GROUP BY f_int2"))).containsInAnyOrder(new Row[]{Row.withSchema(Schema.builder().addInt32Field("f_int2").addInt32Field("autoload_squarecubicsum").build()).addValues(new Object[]{0, 4890}).build()});
        this.pipeline.run().waitUntilFinish();
    }
}
