package org.apache.flink.table.runtime.operators.join;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper;
import org.apache.flink.table.runtime.generated.GeneratedResultFutureWrapper;
import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.util.Collector;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest.class */
public class AsyncLookupJoinHarnessTest {
    private static final int ASYNC_BUFFER_CAPACITY = 100;
    private static final int ASYNC_TIMEOUT_MS = 3000;
    private final TypeSerializer<RowData> inSerializer = new RowDataSerializer(new ExecutionConfig(), new LogicalType[]{new IntType(), new VarCharType(Integer.MAX_VALUE)});
    private final RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(new TypeInformation[]{Types.INT, Types.STRING, Types.INT, Types.STRING});
    private RowDataTypeInfo rightRowTypeInfo = new RowDataTypeInfo(new LogicalType[]{new IntType(), new VarCharType(Integer.MAX_VALUE)});
    private TypeInformation<?> fetcherReturnType = this.rightRowTypeInfo;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest$CalculateOnTemporalTable.class */
    public static final class CalculateOnTemporalTable implements FlatMapFunction<RowData, RowData> {
        private static final long serialVersionUID = -1860345072157431136L;

        public void flatMap(RowData rowData, Collector<RowData> collector) throws Exception {
            if (rowData.getString(1).getSizeInBytes() >= 6) {
                collector.collect(rowData);
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((RowData) obj, (Collector<RowData>) collector);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest$FilterOnTable.class */
    public enum FilterOnTable {
        WITH_FILTER,
        WITHOUT_FILTER
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest$JoinType.class */
    public enum JoinType {
        INNER_JOIN,
        LEFT_JOIN
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest$TestingFetcherFunction.class */
    public static final class TestingFetcherFunction extends AbstractRichFunction implements AsyncFunction<RowData, RowData> {
        private static final long serialVersionUID = 4018474964018227081L;
        private static final Map<Integer, List<RowData>> data = new HashMap();
        private transient ExecutorService executor;

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.executor = Executors.newSingleThreadExecutor();
        }

        public void asyncInvoke(RowData rowData, ResultFuture<RowData> resultFuture) throws Exception {
            int i = rowData.getInt(0);
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
                return data.get(Integer.valueOf(i));
            }, this.executor);
            resultFuture.getClass();
            supplyAsync.thenAcceptAsync(resultFuture::complete, (Executor) this.executor);
        }

        public void close() throws Exception {
            super.close();
            if (null == this.executor || this.executor.isShutdown()) {
                return;
            }
            this.executor.shutdown();
        }

        public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
            asyncInvoke((RowData) obj, (ResultFuture<RowData>) resultFuture);
        }

        static {
            data.put(1, Collections.singletonList(GenericRowData.of(new Object[]{1, StringData.fromString("Julian")})));
            data.put(3, Arrays.asList(GenericRowData.of(new Object[]{3, StringData.fromString("Jark")}), GenericRowData.of(new Object[]{3, StringData.fromString("Jackson")})));
            data.put(4, Collections.singletonList(GenericRowData.of(new Object[]{4, StringData.fromString("Fabian")})));
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest$TestingFetcherResultFuture.class */
    public static final class TestingFetcherResultFuture extends TableFunctionResultFuture<RowData> {
        private static final long serialVersionUID = -312754413938303160L;

        public void complete(Collection<RowData> collection) {
            getResultFuture().complete(collection);
        }
    }

    @Test
    public void testTemporalInnerAsyncJoin() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createHarness = createHarness(JoinType.INNER_JOIN, FilterOnTable.WITHOUT_FILTER);
        createHarness.open();
        synchronized (createHarness.getCheckpointLock()) {
            createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
            createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
            createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
            createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
            createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        }
        synchronized (createHarness.getCheckpointLock()) {
            createHarness.endInput();
            createHarness.close();
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
    }

    @Test
    public void testTemporalInnerAsyncJoinWithFilter() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createHarness = createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER);
        createHarness.open();
        synchronized (createHarness.getCheckpointLock()) {
            createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
            createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
            createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
            createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
            createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        }
        synchronized (createHarness.getCheckpointLock()) {
            createHarness.endInput();
            createHarness.close();
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
    }

    @Test
    public void testTemporalLeftAsyncJoin() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createHarness = createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITHOUT_FILTER);
        createHarness.open();
        synchronized (createHarness.getCheckpointLock()) {
            createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
            createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
            createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
            createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
            createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        }
        synchronized (createHarness.getCheckpointLock()) {
            createHarness.endInput();
            createHarness.close();
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        arrayList.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
    }

    @Test
    public void testTemporalLeftAsyncJoinWithFilter() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createHarness = createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER);
        createHarness.open();
        synchronized (createHarness.getCheckpointLock()) {
            createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
            createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
            createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
            createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
            createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        }
        synchronized (createHarness.getCheckpointLock()) {
            createHarness.endInput();
            createHarness.close();
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        arrayList.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createHarness(JoinType joinType, FilterOnTable filterOnTable) throws Exception {
        boolean z = joinType == JoinType.LEFT_JOIN;
        return new OneInputStreamOperatorTestHarness<>(new AsyncWaitOperatorFactory(filterOnTable == FilterOnTable.WITHOUT_FILTER ? new AsyncLookupJoinRunner(new GeneratedFunctionWrapper(new TestingFetcherFunction()), new GeneratedResultFutureWrapper(new TestingFetcherResultFuture()), this.fetcherReturnType, this.rightRowTypeInfo, z, ASYNC_BUFFER_CAPACITY) : new AsyncLookupJoinWithCalcRunner(new GeneratedFunctionWrapper(new TestingFetcherFunction()), new GeneratedFunctionWrapper(new CalculateOnTemporalTable()), new GeneratedResultFutureWrapper(new TestingFetcherResultFuture()), this.fetcherReturnType, this.rightRowTypeInfo, z, ASYNC_BUFFER_CAPACITY), 3000L, ASYNC_BUFFER_CAPACITY, AsyncDataStream.OutputMode.ORDERED), this.inSerializer);
    }
}
