package org.apache.flink.table.runtime.operators.join.lookup;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/AsyncLookupJoinWithCalcRunner.class */
public class AsyncLookupJoinWithCalcRunner extends AsyncLookupJoinRunner {
    private static final long serialVersionUID = 8758670006385551407L;
    private final GeneratedFunction<FlatMapFunction<BaseRow, BaseRow>> generatedCalc;
    private final BaseRowTypeInfo rightRowTypeInfo;
    private transient TypeSerializer<BaseRow> rightSerializer;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.class */
    private class CalcCollectionCollector implements Collector<BaseRow> {
        Collection<BaseRow> collection;

        private CalcCollectionCollector() {
        }

        public void reset() {
            this.collection = new ArrayList();
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void collect(BaseRow baseRow) {
            this.collection.add(AsyncLookupJoinWithCalcRunner.this.rightSerializer.copy(baseRow));
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.class */
    private class TemporalTableCalcResultFuture extends TableFunctionResultFuture<BaseRow> {
        private static final long serialVersionUID = -6360673852888872924L;
        private final FlatMapFunction<BaseRow, BaseRow> calc;
        private final TableFunctionResultFuture<BaseRow> joinConditionResultFuture;
        private final CalcCollectionCollector calcCollector;

        private TemporalTableCalcResultFuture(FlatMapFunction<BaseRow, BaseRow> flatMapFunction, TableFunctionResultFuture<BaseRow> tableFunctionResultFuture) {
            this.calcCollector = new CalcCollectionCollector();
            this.calc = flatMapFunction;
            this.joinConditionResultFuture = tableFunctionResultFuture;
        }

        @Override // org.apache.flink.table.runtime.collector.TableFunctionResultFuture
        public void setInput(Object obj) {
            this.joinConditionResultFuture.setInput(obj);
            this.calcCollector.reset();
        }

        @Override // org.apache.flink.table.runtime.collector.TableFunctionResultFuture
        public void setResultFuture(ResultFuture<?> resultFuture) {
            this.joinConditionResultFuture.setResultFuture(resultFuture);
        }

        public void complete(Collection<BaseRow> collection) {
            if (collection == null || collection.size() == 0) {
                this.joinConditionResultFuture.complete(collection);
                return;
            }
            Iterator<BaseRow> it = collection.iterator();
            while (it.hasNext()) {
                try {
                    this.calc.flatMap(it.next(), this.calcCollector);
                } catch (Exception e) {
                    this.joinConditionResultFuture.completeExceptionally(e);
                }
            }
            this.joinConditionResultFuture.complete(this.calcCollector.collection);
        }

        public void close() throws Exception {
            super.close();
            this.joinConditionResultFuture.close();
            FunctionUtils.closeFunction(this.calc);
        }
    }

    public AsyncLookupJoinWithCalcRunner(GeneratedFunction<AsyncFunction<BaseRow, Object>> generatedFunction, GeneratedFunction<FlatMapFunction<BaseRow, BaseRow>> generatedFunction2, GeneratedResultFuture<TableFunctionResultFuture<BaseRow>> generatedResultFuture, TypeInformation<?> typeInformation, BaseRowTypeInfo baseRowTypeInfo, boolean z, int i) {
        super(generatedFunction, generatedResultFuture, typeInformation, baseRowTypeInfo, z, i);
        this.rightRowTypeInfo = baseRowTypeInfo;
        this.generatedCalc = generatedFunction2;
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.generatedCalc.compile(getRuntimeContext().getUserCodeClassLoader());
        this.rightSerializer = this.rightRowTypeInfo.m5730createSerializer(getRuntimeContext().getExecutionConfig());
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner
    public TableFunctionResultFuture<BaseRow> createFetcherResultFuture(Configuration configuration) throws Exception {
        TableFunctionResultFuture<BaseRow> createFetcherResultFuture = super.createFetcherResultFuture(configuration);
        FlatMapFunction flatMapFunction = (FlatMapFunction) this.generatedCalc.newInstance(getRuntimeContext().getUserCodeClassLoader());
        FunctionUtils.setFunctionRuntimeContext(flatMapFunction, getRuntimeContext());
        FunctionUtils.openFunction(flatMapFunction, configuration);
        return new TemporalTableCalcResultFuture(flatMapFunction, createFetcherResultFuture);
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner
    public void close() throws Exception {
        super.close();
    }
}
