package org.apache.flink.table.planner.codegen;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.AsyncScalarFunction;
import org.apache.flink.table.planner.calcite.SqlToRexConverter;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.PlannerMocks;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.class */
public class AsyncCodeGeneratorTest {
    private static final RowType INPUT_TYPE = RowType.of(new LogicalType[]{new IntType(), new BigIntType(), new VarCharType()});
    private PlannerMocks plannerMocks;
    private SqlToRexConverter converter;
    private RelDataType tableRowType;

    /* loaded from: input_file:org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest$AsyncFunc.class */
    public static final class AsyncFunc extends AsyncScalarFunction {
        public void eval(CompletableFuture<String> completableFuture, Integer num, Long l, String str) {
            completableFuture.complete("complete " + str + " " + (num.intValue() * num.intValue()) + " " + (2 * l.longValue()));
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest$AsyncFuncError.class */
    public static final class AsyncFuncError extends AsyncScalarFunction {
        public void eval(CompletableFuture<String> completableFuture, Integer num, Long l, String str) {
            completableFuture.completeExceptionally(new RuntimeException("Error!"));
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest$TestResultFuture.class */
    public static final class TestResultFuture implements ResultFuture<RowData> {
        CompletableFuture<Collection<RowData>> data = new CompletableFuture<>();

        public void complete(Collection<RowData> collection) {
            this.data.complete(collection);
        }

        public void completeExceptionally(Throwable th) {
            this.data.completeExceptionally(th);
        }

        public CompletableFuture<Collection<RowData>> getResult() {
            return this.data;
        }
    }

    @BeforeEach
    public void before() {
        this.plannerMocks = PlannerMocks.create();
        this.tableRowType = this.plannerMocks.getPlannerContext().getTypeFactory().buildRelNodeRowType(JavaScalaConversionUtil.toScala(Arrays.asList("f1", "f2", "f3")), JavaScalaConversionUtil.toScala(Arrays.asList(new IntType(), new BigIntType(), new VarCharType())));
        ShortcutUtils.unwrapContext(this.plannerMocks.getPlanner().createToRelContext().getCluster());
        this.converter = ShortcutUtils.unwrapContext(this.plannerMocks.getPlanner().createToRelContext().getCluster()).getRexFactory().createSqlToRexConverter(this.tableRowType, (RelDataType) null);
        this.plannerMocks.getFunctionCatalog().registerTemporarySystemFunction("myfunc", new AsyncFunc(), false);
        this.plannerMocks.getFunctionCatalog().registerTemporarySystemFunction("myfunc_error", new AsyncFuncError(), false);
    }

    @Test
    public void testStringReturnType() throws Exception {
        Assertions.assertThat(execute("myFunc(f1, f2, f3)", RowType.of(new LogicalType[]{new VarCharType()}), (RowData) GenericRowData.of(new Object[]{2, 3L, StringData.fromString("foo")}))).isEqualTo(GenericRowData.of(new Object[]{StringData.fromString("complete foo 4 6")}));
    }

    @Test
    public void testTwoReturnTypes_passThroughFirst() throws Exception {
        Assertions.assertThat(execute(Arrays.asList("f2", "myFunc(f1, f2, f3)"), RowType.of(new LogicalType[]{new VarCharType(), new BigIntType()}), (RowData) GenericRowData.of(new Object[]{2, 3L, StringData.fromString("foo")}))).isEqualTo(GenericRowData.of(new Object[]{3L, StringData.fromString("complete foo 4 6")}));
    }

    @Test
    public void testTwoReturnTypes_passThroughSecond() throws Exception {
        Assertions.assertThat(execute(Arrays.asList("myFunc(f1, f2, f3)", "f2"), RowType.of(new LogicalType[]{new VarCharType(), new BigIntType()}), (RowData) GenericRowData.of(new Object[]{2, 3L, StringData.fromString("foo")}))).isEqualTo(GenericRowData.of(new Object[]{StringData.fromString("complete foo 4 6"), 3L}));
    }

    @Test
    public void testError() throws Exception {
        CompletableFuture<Collection<RowData>> executeFuture = executeFuture(Arrays.asList("myFunc_error(f1, f2, f3)"), RowType.of(new LogicalType[]{new VarCharType(), new BigIntType()}), GenericRowData.of(new Object[]{2, 3L, StringData.fromString("foo")}));
        Assertions.assertThat(executeFuture).isCompletedExceptionally();
        executeFuture.getClass();
        Assertions.assertThatThrownBy(executeFuture::get).cause().hasMessage("Error!");
    }

    private RowData execute(String str, RowType rowType, RowData rowData) throws Exception {
        return execute(Arrays.asList(str), rowType, rowData);
    }

    private RowData execute(List<String> list, RowType rowType, RowData rowData) throws Exception {
        Collection<RowData> collection = executeFuture(list, rowType, rowData).get();
        Assertions.assertThat(collection).hasSize(1);
        return collection.iterator().next();
    }

    private CompletableFuture<Collection<RowData>> executeFuture(List<String> list, RowType rowType, RowData rowData) throws Exception {
        AsyncFunction asyncFunction = (AsyncFunction) AsyncCodeGenerator.generateFunction("name", INPUT_TYPE, rowType, (List) list.stream().map(str -> {
            return this.converter.convertToRexNode(str);
        }).collect(Collectors.toList()), true, new Configuration(), Thread.currentThread().getContextClassLoader()).newInstance(Thread.currentThread().getContextClassLoader());
        TestResultFuture testResultFuture = new TestResultFuture();
        asyncFunction.asyncInvoke(rowData, testResultFuture);
        return testResultFuture.getResult();
    }
}
