package org.apache.flink.table.planner.runtime.stream.table;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.functions.AsyncScalarFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase.class */
public class AsyncCalcITCase extends StreamingTestBase {
    private TableEnvironment tEnv;

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase$AsyncFunc.class */
    public static class AsyncFunc extends AsyncFuncBase {
        private static final long serialVersionUID = 1;

        public void eval(CompletableFuture<String> completableFuture, Integer num) {
            this.executor.schedule(() -> {
                return Boolean.valueOf(completableFuture.complete("val " + num));
            }, 10L, TimeUnit.MILLISECONDS);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase$AsyncFuncAdd10.class */
    public static class AsyncFuncAdd10 extends AsyncFuncBase {
        private static final long serialVersionUID = 2;

        public void eval(CompletableFuture<Integer> completableFuture, Integer num) {
            this.executor.schedule(() -> {
                return Boolean.valueOf(completableFuture.complete(Integer.valueOf(num.intValue() + 10)));
            }, 10L, TimeUnit.MILLISECONDS);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase$AsyncFuncBase.class */
    public static class AsyncFuncBase extends AsyncScalarFunction {
        protected ScheduledExecutorService executor;

        public void open(FunctionContext functionContext) {
            this.executor = Executors.newSingleThreadScheduledExecutor();
        }

        public void close() {
            if (null == this.executor || this.executor.isShutdown()) {
                return;
            }
            this.executor.shutdownNow();
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase$AsyncFuncFail.class */
    public static class AsyncFuncFail extends AsyncFuncBase implements Serializable {
        private static final long serialVersionUID = 8996145425452974113L;
        private final int numFailures;
        private final AtomicInteger failures = new AtomicInteger(0);

        public AsyncFuncFail(int i) {
            this.numFailures = i;
        }

        public void eval(CompletableFuture<Integer> completableFuture, int i) {
            if (this.failures.getAndIncrement() < this.numFailures) {
                completableFuture.completeExceptionally(new RuntimeException("Error " + this.failures.get()));
            } else {
                completableFuture.complete(Integer.valueOf(this.failures.get()));
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase$AsyncFuncGeneric.class */
    public static abstract class AsyncFuncGeneric<T> extends AsyncFuncBase {
        private static final long serialVersionUID = 3;

        abstract T[] newT(int i);

        public void eval(CompletableFuture<T[]> completableFuture, Integer num) {
            this.executor.schedule(() -> {
                return Boolean.valueOf(completableFuture.complete(newT(num.intValue())));
            }, 10L, TimeUnit.MILLISECONDS);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase$AsyncFuncMoreGeneric.class */
    public static abstract class AsyncFuncMoreGeneric<T> extends AsyncFuncBase {
        private static final long serialVersionUID = 3;

        abstract void finish(T t, int i);

        public void eval(T t, Integer num) {
            this.executor.schedule(() -> {
                finish(t, num.intValue());
            }, 10L, TimeUnit.MILLISECONDS);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase$AsyncFuncOverload.class */
    public static class AsyncFuncOverload extends AsyncFuncBase {
        private static final long serialVersionUID = 3;

        public void eval(CompletableFuture<String> completableFuture, Integer num) {
            this.executor.schedule(() -> {
                return Boolean.valueOf(completableFuture.complete("int version " + num));
            }, 10L, TimeUnit.MILLISECONDS);
        }

        public void eval(CompletableFuture<String> completableFuture, String str) {
            this.executor.schedule(() -> {
                return Boolean.valueOf(completableFuture.complete("string version " + str));
            }, 10L, TimeUnit.MILLISECONDS);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase$AsyncFuncRow.class */
    public static class AsyncFuncRow extends AsyncScalarFunction {
        @DataTypeHint("ROW<f0 INT, f1 String>")
        public void eval(CompletableFuture<Row> completableFuture, int i) {
            completableFuture.complete(Row.of(new Object[]{Integer.valueOf(i + 1), "" + (i * i)}));
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase$LongAsyncFuncGeneric.class */
    public static class LongAsyncFuncGeneric extends AsyncFuncGeneric<Long> {
        /* JADX INFO: Access modifiers changed from: package-private */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.table.planner.runtime.stream.table.AsyncCalcITCase.AsyncFuncGeneric
        public Long[] newT(int i) {
            return new Long[]{Long.valueOf(10 + i)};
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase$LongAsyncFuncMoreGeneric.class */
    public static class LongAsyncFuncMoreGeneric extends AsyncFuncMoreGeneric<CompletableFuture<Long[]>> {
        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.flink.table.planner.runtime.stream.table.AsyncCalcITCase.AsyncFuncMoreGeneric
        public void finish(CompletableFuture<Long[]> completableFuture, int i) {
            completableFuture.complete(new Long[]{Long.valueOf(10 + i)});
        }
    }

    @Override // org.apache.flink.table.planner.runtime.utils.StreamingTestBase
    @BeforeEach
    public void before() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(2);
        this.tEnv = StreamTableEnvironment.create(executionEnvironment, EnvironmentSettings.inStreamingMode());
        this.tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY, 2);
        this.tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_TIMEOUT, Duration.ofMinutes(1L));
    }

    @Test
    public void testSimpleTableSelect() {
        this.tEnv.createTemporaryView("t1", this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]));
        this.tEnv.createTemporarySystemFunction("func", new AsyncFunc());
        Assertions.assertThat(executeSql("select func(f1) from t1")).containsSequence(Arrays.asList(Row.of(new Object[]{"val 1"}), Row.of(new Object[]{"val 2"}), Row.of(new Object[]{"val 3"})));
    }

    @Test
    public void testLiteralPlusTableSelect() {
        this.tEnv.createTemporaryView("t1", this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]));
        this.tEnv.createTemporarySystemFunction("func", new AsyncFunc());
        Assertions.assertThat(executeSql("select 'foo', func(f1) from t1")).containsSequence(Arrays.asList(Row.of(new Object[]{"foo", "val 1"}), Row.of(new Object[]{"foo", "val 2"}), Row.of(new Object[]{"foo", "val 3"})));
    }

    @Test
    public void testFieldPlusTableSelect() {
        this.tEnv.createTemporaryView("t1", this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]));
        this.tEnv.createTemporarySystemFunction("func", new AsyncFunc());
        Assertions.assertThat(executeSql("select f1, func(f1) from t1")).containsSequence(Arrays.asList(Row.of(new Object[]{1, "val 1"}), Row.of(new Object[]{2, "val 2"}), Row.of(new Object[]{3, "val 3"})));
    }

    @Test
    public void testTwoCalls() {
        this.tEnv.createTemporaryView("t1", this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]));
        this.tEnv.createTemporarySystemFunction("func", new AsyncFunc());
        Assertions.assertThat(executeSql("select func(f1), func(f1) from t1")).containsSequence(Arrays.asList(Row.of(new Object[]{"val 1", "val 1"}), Row.of(new Object[]{"val 2", "val 2"}), Row.of(new Object[]{"val 3", "val 3"})));
    }

    @Test
    public void testThreeNestedCalls() {
        this.tEnv.createTemporaryView("t1", this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]));
        this.tEnv.createTemporarySystemFunction("func", new AsyncFuncAdd10());
        Assertions.assertThat(executeSql("select func(func(f1)), func(func(func(f1))), func(f1) from t1")).containsSequence(Arrays.asList(Row.of(new Object[]{21, 31, 11}), Row.of(new Object[]{22, 32, 12}), Row.of(new Object[]{23, 33, 13})));
    }

    @Test
    public void testPassedToOtherUDF() {
        this.tEnv.createTemporaryView("t1", this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]));
        this.tEnv.createTemporarySystemFunction("func", new AsyncFunc());
        Assertions.assertThat(executeSql("select Concat(func(f1), 'foo') from t1")).containsSequence(Arrays.asList(Row.of(new Object[]{"val 1foo"}), Row.of(new Object[]{"val 2foo"}), Row.of(new Object[]{"val 3foo"})));
    }

    @Test
    public void testJustCall() {
        this.tEnv.createTemporaryView("t1", this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]));
        this.tEnv.createTemporarySystemFunction("func", new AsyncFunc());
        Assertions.assertThat(executeSql("select func(1)")).containsSequence(Collections.singletonList(Row.of(new Object[]{"val 1"})));
    }

    @Test
    public void testWhereConditionAndProjection() {
        this.tEnv.createTemporaryView("t1", this.tEnv.fromValues(new Object[]{1, 2, 3}).as("f1", new String[0]));
        this.tEnv.createTemporarySystemFunction("func", new AsyncFunc());
        Assertions.assertThat(executeSql("select func(f1) from t1 where REGEXP(func(f1), 'val (2|3)')")).containsSequence(Arrays.asList(Row.of(new Object[]{"val 2"}), Row.of(new Object[]{"val 3"})));
    }

    @Test
    public void testFieldAccessAfter() {
        this.tEnv.createTemporaryView("t1", this.tEnv.fromValues(new Object[]{2}).as("f1", new String[0]));
        this.tEnv.createTemporarySystemFunction("func", new AsyncFuncRow());
        Assertions.assertThat(executeSql("select func(f1).f0 from t1")).containsSequence(Collections.singletonList(Row.of(new Object[]{3})));
    }

    @Test
    public void testFieldOperand() {
        this.tEnv.createTemporaryView("t1", this.tEnv.fromValues(new Object[]{2}).as("f1", new String[0]));
        this.tEnv.createTemporarySystemFunction("func", new AsyncFuncRow());
        this.tEnv.createTemporarySystemFunction("func2", new AsyncFuncAdd10());
        this.tEnv.createTemporaryView("t2", this.tEnv.sqlQuery("select func(f1) from t1"));
        Assertions.assertThat(executeSql("select func2(t2.f0) from t2")).containsSequence(Collections.singletonList(Row.of(new Object[]{13})));
    }

    @Test
    public void testOverload() {
        this.tEnv.createTemporaryView("t1", this.tEnv.fromValues(new Object[]{1}).as("f1", new String[0]));
        this.tEnv.createTemporarySystemFunction("func", new AsyncFuncOverload());
        Assertions.assertThat(executeSql("select func(f1), func(cast(f1 as String)) from t1")).containsSequence(Collections.singletonList(Row.of(new Object[]{"int version 1", "string version 1"})));
    }

    @Test
    public void testMultiLayerGeneric() {
        this.tEnv.createTemporaryView("t1", this.tEnv.fromValues(new Object[]{1}).as("f1", new String[0]));
        this.tEnv.createTemporarySystemFunction("func", new LongAsyncFuncGeneric());
        Assertions.assertThat(executeSql("select func(f1) from t1")).containsSequence(Collections.singletonList(Row.of(new Object[]{new Long[]{11L}})));
    }

    @Test
    public void testMultiLayerMoreGeneric() {
        this.tEnv.createTemporaryView("t1", this.tEnv.fromValues(new Object[]{1}).as("f1", new String[0]));
        this.tEnv.createTemporarySystemFunction("func", new LongAsyncFuncMoreGeneric());
        Assertions.assertThat(executeSql("select func(f1) from t1")).containsSequence(Collections.singletonList(Row.of(new Object[]{new Long[]{11L}})));
    }

    @Test
    public void testFailures() {
        this.tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY, 1);
        this.tEnv.createTemporaryView("t1", this.tEnv.fromValues(new Object[]{1}).as("f1", new String[0]));
        this.tEnv.createTemporarySystemFunction("func", new AsyncFuncFail(2));
        Assertions.assertThat(executeSql("select func(f1) from t1")).containsSequence(Collections.singletonList(Row.of(new Object[]{3})));
    }

    private List<Row> executeSql(String str) {
        TableResult executeSql = this.tEnv.executeSql(str);
        ArrayList arrayList = new ArrayList();
        CloseableIterator collect = executeSql.collect();
        arrayList.getClass();
        collect.forEachRemaining((v1) -> {
            r1.add(v1);
        });
        return arrayList;
    }
}
