package org.apache.flink.table.utils;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;

/* loaded from: input_file:org/apache/flink/table/utils/TestCollectionTableFactory.class */
public class TestCollectionTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    public static boolean isStreaming = true;
    private static final LinkedList<Row> SOURCE_DATA = new LinkedList<>();
    private static final LinkedList<Row> DIM_DATA = new LinkedList<>();
    private static final LinkedList<Row> RESULT = new LinkedList<>();
    private static long emitIntervalMS = -1;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/table/utils/TestCollectionTableFactory$CollectionTableSink.class */
    public static class CollectionTableSink implements DynamicTableSink {
        private final DataType outputType;

        public CollectionTableSink(DataType dataType) {
            this.outputType = dataType;
        }

        public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
            return changelogMode;
        }

        public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
            final TypeInformation createTypeInformation = context.createTypeInformation(this.outputType);
            final DynamicTableSink.DataStructureConverter createDataStructureConverter = context.createDataStructureConverter(this.outputType);
            return new DataStreamSinkProvider() { // from class: org.apache.flink.table.utils.TestCollectionTableFactory.CollectionTableSink.1
                public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                    return dataStream.addSink(new UnsafeMemorySinkFunction(createTypeInformation, createDataStructureConverter)).setParallelism(1);
                }
            };
        }

        public DynamicTableSink copy() {
            return new CollectionTableSink(this.outputType);
        }

        public String asSummaryString() {
            return String.format("CollectionTableSink(%s)", this.outputType);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/table/utils/TestCollectionTableFactory$CollectionTableSource.class */
    public static class CollectionTableSource implements ScanTableSource, LookupTableSource {
        private final Long emitIntervalMS;
        private final DataType rowType;
        private final boolean isStreaming;
        private final Optional<Integer> parallelism;

        public CollectionTableSource(Long l, DataType dataType, boolean z, Optional<Integer> optional) {
            this.emitIntervalMS = l;
            this.rowType = dataType;
            this.isStreaming = z;
            this.parallelism = optional;
        }

        public DynamicTableSource copy() {
            return new CollectionTableSource(this.emitIntervalMS, this.rowType, this.isStreaming, this.parallelism);
        }

        public String asSummaryString() {
            return "CollectionTableSource";
        }

        public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext lookupContext) {
            return TableFunctionProvider.of(new TemporalTableFetcher(TestCollectionTableFactory.DIM_DATA, Arrays.stream(lookupContext.getKeys()).mapToInt(iArr -> {
                return iArr[0];
            }).toArray()));
        }

        public ChangelogMode getChangelogMode() {
            return ChangelogMode.insertOnly();
        }

        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
            final TypeInformation createTypeInformation = scanContext.createTypeInformation(this.rowType);
            final TypeSerializer createSerializer = createTypeInformation.createSerializer(new SerializerConfigImpl());
            DynamicTableSource.DataStructureConverter createDataStructureConverter = scanContext.createDataStructureConverter(this.rowType);
            final List list = (List) TestCollectionTableFactory.SOURCE_DATA.stream().map(row -> {
                return (RowData) createDataStructureConverter.toInternal(row);
            }).collect(Collectors.toList());
            return new DataStreamScanProvider() { // from class: org.apache.flink.table.utils.TestCollectionTableFactory.CollectionTableSource.1
                public DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment streamExecutionEnvironment) {
                    DataStreamSource createInput = streamExecutionEnvironment.createInput(new TestCollectionInputFormat(CollectionTableSource.this.emitIntervalMS.longValue(), list, createSerializer), createTypeInformation);
                    Optional optional = CollectionTableSource.this.parallelism;
                    createInput.getClass();
                    optional.ifPresent((v1) -> {
                        r1.setParallelism(v1);
                    });
                    return createInput;
                }

                public boolean isBounded() {
                    return !CollectionTableSource.this.isStreaming;
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/flink/table/utils/TestCollectionTableFactory$TemporalTableFetcher.class */
    static class TemporalTableFetcher extends TableFunction<Row> {
        private final LinkedList<Row> dimData;
        private final int[] keyes;

        public TemporalTableFetcher(LinkedList<Row> linkedList, int[] iArr) {
            this.dimData = linkedList;
            this.keyes = iArr;
        }

        public void eval(Object... objArr) {
            Iterator<Row> it = this.dimData.iterator();
            while (it.hasNext()) {
                Row next = it.next();
                boolean z = true;
                for (int i = 0; z && i < this.keyes.length; i++) {
                    Object field = next.getField(this.keyes[i]);
                    Object obj = objArr[i];
                    z = field != null ? field.equals(obj) : obj == null;
                }
                if (z) {
                    Row row = new Row(next.getArity());
                    for (int i2 = 0; i2 < next.getArity(); i2++) {
                        row.setField(i2, next.getField(i2));
                    }
                    collect(row);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/table/utils/TestCollectionTableFactory$TestCollectionInputFormat.class */
    static class TestCollectionInputFormat<T> extends CollectionInputFormat<T> {
        private final long emitIntervalMs;

        public TestCollectionInputFormat(long j, Collection<T> collection, TypeSerializer<T> typeSerializer) {
            super(collection, typeSerializer);
            this.emitIntervalMs = j;
        }

        public boolean reachedEnd() throws IOException {
            if (this.emitIntervalMs > 0) {
                try {
                    Thread.sleep(this.emitIntervalMs);
                } catch (Exception e) {
                }
            }
            return super.reachedEnd();
        }
    }

    /* loaded from: input_file:org/apache/flink/table/utils/TestCollectionTableFactory$UnsafeMemorySinkFunction.class */
    static class UnsafeMemorySinkFunction extends RichSinkFunction<RowData> {
        private TypeSerializer<Row> serializer;
        private final TypeInformation<Row> outputType;
        private final DynamicTableSink.DataStructureConverter converter;

        public UnsafeMemorySinkFunction(TypeInformation<Row> typeInformation, DynamicTableSink.DataStructureConverter dataStructureConverter) {
            this.outputType = typeInformation;
            this.converter = dataStructureConverter;
        }

        public void open(OpenContext openContext) throws Exception {
            this.serializer = this.outputType.createSerializer(new SerializerConfigImpl());
        }

        public void invoke(RowData rowData, SinkFunction.Context context) throws Exception {
            TestCollectionTableFactory.RESULT.add(this.serializer.copy((Row) this.converter.toExternal(rowData)));
        }
    }

    public static void initData(List<Row> list, List<Row> list2, Long l) {
        SOURCE_DATA.addAll(list);
        DIM_DATA.addAll(list2);
        emitIntervalMS = l == null ? -1L : l.longValue();
    }

    public static void reset() {
        RESULT.clear();
        SOURCE_DATA.clear();
        DIM_DATA.clear();
        emitIntervalMS = -1L;
    }

    public static CollectionTableSource getCollectionSource(ResolvedCatalogTable resolvedCatalogTable, boolean z) {
        String str = (String) resolvedCatalogTable.getOptions().getOrDefault("parallelism", null);
        return new CollectionTableSource(Long.valueOf(emitIntervalMS), resolvedCatalogTable.getResolvedSchema().toSourceRowDataType(), z, str == null ? Optional.empty() : Optional.of(Integer.valueOf(Integer.parseInt(str))));
    }

    public static CollectionTableSink getCollectionSink(ResolvedCatalogTable resolvedCatalogTable) {
        return new CollectionTableSink(resolvedCatalogTable.getResolvedSchema().toSinkRowDataType());
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        return getCollectionSource(context.getCatalogTable(), isStreaming);
    }

    public String factoryIdentifier() {
        return "COLLECTION";
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return new HashSet();
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return new HashSet();
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        return getCollectionSink(context.getCatalogTable());
    }
}
