/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper;
import org.apache.flink.table.runtime.generated.GeneratedResultFutureWrapper;
import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.runtime.util.BaseRowHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.util.Collector;
import org.junit.Test;

public class AsyncLookupJoinHarnessTest {
    private static final int ASYNC_BUFFER_CAPACITY = 100;
    private static final int ASYNC_TIMEOUT_MS = 3000;
    private final TypeSerializer<BaseRow> inSerializer = new BaseRowSerializer(new ExecutionConfig(), new LogicalType[]{new IntType(), new VarCharType(Integer.MAX_VALUE)});
    private final BaseRowHarnessAssertor assertor = new BaseRowHarnessAssertor(new TypeInformation[]{Types.INT, Types.STRING, Types.INT, Types.STRING});
    private BaseRowTypeInfo rightRowTypeInfo;
    private TypeInformation<?> fetcherReturnType = this.rightRowTypeInfo = new BaseRowTypeInfo(new LogicalType[]{new IntType(), new VarCharType(Integer.MAX_VALUE)});

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTemporalInnerAsyncJoin() throws Exception {
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createHarness(JoinType.INNER_JOIN, FilterOnTable.WITHOUT_FILTER);
        testHarness.open();
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement(StreamRecordUtils.record(1, "a"));
            testHarness.processElement(StreamRecordUtils.record(2, "b"));
            testHarness.processElement(StreamRecordUtils.record(3, "c"));
            testHarness.processElement(StreamRecordUtils.record(4, "d"));
            testHarness.processElement(StreamRecordUtils.record(5, "e"));
        }
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.close();
        }
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.record(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.record(3, "c", 3, "Jark"));
        expectedOutput.add(StreamRecordUtils.record(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.record(4, "d", 4, "Fabian"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTemporalInnerAsyncJoinWithFilter() throws Exception {
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER);
        testHarness.open();
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement(StreamRecordUtils.record(1, "a"));
            testHarness.processElement(StreamRecordUtils.record(2, "b"));
            testHarness.processElement(StreamRecordUtils.record(3, "c"));
            testHarness.processElement(StreamRecordUtils.record(4, "d"));
            testHarness.processElement(StreamRecordUtils.record(5, "e"));
        }
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.close();
        }
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.record(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.record(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.record(4, "d", 4, "Fabian"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTemporalLeftAsyncJoin() throws Exception {
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITHOUT_FILTER);
        testHarness.open();
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement(StreamRecordUtils.record(1, "a"));
            testHarness.processElement(StreamRecordUtils.record(2, "b"));
            testHarness.processElement(StreamRecordUtils.record(3, "c"));
            testHarness.processElement(StreamRecordUtils.record(4, "d"));
            testHarness.processElement(StreamRecordUtils.record(5, "e"));
        }
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.close();
        }
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.record(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.record(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.record(3, "c", 3, "Jark"));
        expectedOutput.add(StreamRecordUtils.record(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.record(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.record(5, "e", null, null));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTemporalLeftAsyncJoinWithFilter() throws Exception {
        OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = this.createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER);
        testHarness.open();
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement(StreamRecordUtils.record(1, "a"));
            testHarness.processElement(StreamRecordUtils.record(2, "b"));
            testHarness.processElement(StreamRecordUtils.record(3, "c"));
            testHarness.processElement(StreamRecordUtils.record(4, "d"));
            testHarness.processElement(StreamRecordUtils.record(5, "e"));
        }
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.close();
        }
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.record(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.record(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.record(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.record(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.record(5, "e", null, null));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    private OneInputStreamOperatorTestHarness<BaseRow, BaseRow> createHarness(JoinType joinType, FilterOnTable filterOnTable) throws Exception {
        boolean isLeftJoin = joinType == JoinType.LEFT_JOIN;
        Object joinRunner = filterOnTable == FilterOnTable.WITHOUT_FILTER ? new AsyncLookupJoinRunner(new GeneratedFunctionWrapper<TestingFetcherFunction>(new TestingFetcherFunction()), new GeneratedResultFutureWrapper<TestingFetcherResultFuture>(new TestingFetcherResultFuture()), this.fetcherReturnType, this.rightRowTypeInfo, isLeftJoin, 100) : new AsyncLookupJoinWithCalcRunner(new GeneratedFunctionWrapper<TestingFetcherFunction>(new TestingFetcherFunction()), new GeneratedFunctionWrapper<CalculateOnTemporalTable>(new CalculateOnTemporalTable()), new GeneratedResultFutureWrapper<TestingFetcherResultFuture>(new TestingFetcherResultFuture()), this.fetcherReturnType, this.rightRowTypeInfo, isLeftJoin, 100);
        return new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new AsyncWaitOperatorFactory((AsyncFunction)joinRunner, 3000L, 100, AsyncDataStream.OutputMode.ORDERED), this.inSerializer);
    }

    public static final class CalculateOnTemporalTable
    implements FlatMapFunction<BaseRow, BaseRow> {
        private static final long serialVersionUID = -1860345072157431136L;

        public void flatMap(BaseRow value, Collector<BaseRow> out) throws Exception {
            BinaryString name = value.getString(1);
            if (name.getSizeInBytes() >= 6) {
                out.collect((Object)value);
            }
        }
    }

    public static final class TestingFetcherResultFuture
    extends TableFunctionResultFuture<BaseRow> {
        private static final long serialVersionUID = -312754413938303160L;

        public void complete(Collection<BaseRow> result) {
            this.getResultFuture().complete(result);
        }
    }

    public static final class TestingFetcherFunction
    extends AbstractRichFunction
    implements AsyncFunction<BaseRow, BaseRow> {
        private static final long serialVersionUID = 4018474964018227081L;
        private static final Map<Integer, List<BaseRow>> data = new HashMap<Integer, List<BaseRow>>();
        private transient ExecutorService executor;

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.executor = Executors.newSingleThreadExecutor();
        }

        public void asyncInvoke(BaseRow input, ResultFuture<BaseRow> resultFuture) throws Exception {
            int id = input.getInt(0);
            CompletableFuture.supplyAsync(() -> data.get(id), this.executor).thenAcceptAsync(arg_0 -> resultFuture.complete(arg_0), (Executor)this.executor);
        }

        public void close() throws Exception {
            super.close();
            if (null != this.executor && !this.executor.isShutdown()) {
                this.executor.shutdown();
            }
        }

        static {
            data.put(1, Collections.singletonList(GenericRow.of((Object[])new Object[]{1, BinaryString.fromString((String)"Julian")})));
            data.put(3, Arrays.asList(GenericRow.of((Object[])new Object[]{3, BinaryString.fromString((String)"Jark")}), GenericRow.of((Object[])new Object[]{3, BinaryString.fromString((String)"Jackson")})));
            data.put(4, Collections.singletonList(GenericRow.of((Object[])new Object[]{4, BinaryString.fromString((String)"Fabian")})));
        }
    }

    private static enum FilterOnTable {
        WITH_FILTER,
        WITHOUT_FILTER;

    }

    private static enum JoinType {
        INNER_JOIN,
        LEFT_JOIN;

    }
}

