package org.apache.hudi.table;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.adapter.DataStreamScanProviderAdapter;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
import org.apache.hudi.org.apache.hadoop.hbase.util.Strings;
import org.apache.hudi.source.FileIndex;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.ExpressionUtils;
import org.apache.hudi.util.InputFormats;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/table/HoodieTableSource.class */
public class HoodieTableSource implements ScanTableSource, SupportsPartitionPushDown, SupportsProjectionPushDown, SupportsLimitPushDown, SupportsFilterPushDown {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieTableSource.class);
    private static final int NO_LIMIT_CONSTANT = -1;
    private final transient Configuration hadoopConf;
    private final transient HoodieTableMetaClient metaClient;
    private final long maxCompactionMemoryInBytes;
    private final ResolvedSchema schema;
    private final RowType tableRowType;
    private final Path path;
    private final List<String> partitionKeys;
    private final String defaultPartName;
    private final org.apache.flink.configuration.Configuration conf;
    private final FileIndex fileIndex;
    private int[] requiredPos;
    private long limit;
    private List<ResolvedExpression> filters;
    private List<Map<String, String>> requiredPartitions;

    public HoodieTableSource(ResolvedSchema resolvedSchema, Path path, List<String> list, String str, org.apache.flink.configuration.Configuration configuration) {
        this(resolvedSchema, path, list, str, configuration, null, null, null, null);
    }

    public HoodieTableSource(ResolvedSchema resolvedSchema, Path path, List<String> list, String str, org.apache.flink.configuration.Configuration configuration, @Nullable List<Map<String, String>> list2, @Nullable int[] iArr, @Nullable Long l, @Nullable List<ResolvedExpression> list3) {
        this.schema = resolvedSchema;
        this.tableRowType = resolvedSchema.toPhysicalRowDataType().notNull().getLogicalType();
        this.path = path;
        this.partitionKeys = list;
        this.defaultPartName = str;
        this.conf = configuration;
        this.requiredPartitions = list2;
        this.requiredPos = iArr == null ? IntStream.range(0, this.tableRowType.getFieldCount()).toArray() : iArr;
        this.limit = l == null ? -1L : l.longValue();
        this.filters = list3 == null ? Collections.emptyList() : list3;
        this.hadoopConf = HadoopConfigurations.getHadoopConf(configuration);
        this.metaClient = StreamerUtil.metaClientForReader(configuration, this.hadoopConf);
        this.fileIndex = FileIndex.instance(this.path, this.conf, this.tableRowType);
        this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(configuration);
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        return new DataStreamScanProviderAdapter() { // from class: org.apache.hudi.table.HoodieTableSource.1
            public boolean isBounded() {
                return !HoodieTableSource.this.conf.getBoolean(FlinkOptions.READ_AS_STREAMING);
            }

            public DataStream<RowData> produceDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
                TypeInformation fromDataTypeToTypeInfo = TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(HoodieTableSource.this.getProducedDataType());
                OptionsInference.setupSourceTasks(HoodieTableSource.this.conf, streamExecutionEnvironment.getParallelism());
                if (!HoodieTableSource.this.conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
                    return streamExecutionEnvironment.addSource(new InputFormatSourceFunction(HoodieTableSource.this.getInputFormat(), fromDataTypeToTypeInfo), HoodieTableSource.this.asSummaryString(), fromDataTypeToTypeInfo).name(HoodieTableSource.this.getSourceOperatorName("bounded_source")).setParallelism(HoodieTableSource.this.conf.getInteger(FlinkOptions.READ_TASKS));
                }
                StreamReadMonitoringFunction streamReadMonitoringFunction = new StreamReadMonitoringFunction(HoodieTableSource.this.conf, FilePathUtils.toFlinkPath(HoodieTableSource.this.path), HoodieTableSource.this.tableRowType, HoodieTableSource.this.maxCompactionMemoryInBytes, HoodieTableSource.this.getRequiredPartitionPaths());
                return new DataStreamSource(streamExecutionEnvironment.addSource(streamReadMonitoringFunction, HoodieTableSource.this.getSourceOperatorName("split_monitor")).setParallelism(1).keyBy((v0) -> {
                    return v0.getFileId();
                }).transform("split_reader", fromDataTypeToTypeInfo, StreamReadOperator.factory((MergeOnReadInputFormat) HoodieTableSource.this.getInputFormat(true))).setParallelism(HoodieTableSource.this.conf.getInteger(FlinkOptions.READ_TASKS)));
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 421507757:
                        if (implMethodName.equals("getFileId")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/format/mor/MergeOnReadInputSplit") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                            return (v0) -> {
                                return v0.getFileId();
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    public ChangelogMode getChangelogMode() {
        return OptionsResolver.emitChangelog(this.conf) ? ChangelogModes.FULL : ChangelogMode.insertOnly();
    }

    public DynamicTableSource copy() {
        return new HoodieTableSource(this.schema, this.path, this.partitionKeys, this.defaultPartName, this.conf, this.requiredPartitions, this.requiredPos, Long.valueOf(this.limit), this.filters);
    }

    public String asSummaryString() {
        return "HudiTableSource";
    }

    public SupportsFilterPushDown.Result applyFilters(List<ResolvedExpression> list) {
        this.filters = (List) list.stream().filter((v0) -> {
            return ExpressionUtils.isSimpleCallExpression(v0);
        }).collect(Collectors.toList());
        this.fileIndex.setFilters(this.filters);
        return SupportsFilterPushDown.Result.of(Collections.emptyList(), new ArrayList(list));
    }

    public Optional<List<Map<String, String>>> listPartitions() {
        return Optional.of(this.fileIndex.getPartitions(this.partitionKeys, this.defaultPartName, this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)));
    }

    public void applyPartitions(List<Map<String, String>> list) {
        this.requiredPartitions = list;
    }

    public boolean supportsNestedProjection() {
        return false;
    }

    public void applyProjection(int[][] iArr) {
        this.requiredPos = Arrays.stream(iArr).mapToInt(iArr2 -> {
            return iArr2[0];
        }).toArray();
    }

    public void applyLimit(long j) {
        this.limit = j;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DataType getProducedDataType() {
        String[] strArr = (String[]) this.schema.getColumnNames().toArray(new String[0]);
        DataType[] dataTypeArr = (DataType[]) this.schema.getColumnDataTypes().toArray(new DataType[0]);
        return DataTypes.ROW((DataTypes.Field[]) Arrays.stream(this.requiredPos).mapToObj(i -> {
            return DataTypes.FIELD(strArr[i], dataTypeArr[i]);
        }).toArray(i2 -> {
            return new DataTypes.Field[i2];
        })).bridgedTo(RowData.class);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getSourceOperatorName(String str) {
        String[] strArr = (String[]) this.schema.getColumnNames().toArray(new String[0]);
        List list = (List) Arrays.stream(this.requiredPos).mapToObj(i -> {
            return strArr[i];
        }).collect(Collectors.toList());
        StringBuilder sb = new StringBuilder();
        sb.append(str).append(VisibilityConstants.OPEN_PARAN).append("table=").append(Collections.singletonList(this.conf.getString(FlinkOptions.TABLE_NAME))).append(Strings.DEFAULT_KEYVALUE_SEPARATOR).append("fields=").append(list).append(VisibilityConstants.CLOSED_PARAN);
        return sb.toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Nullable
    public Set<String> getRequiredPartitionPaths() {
        if (this.requiredPartitions == null) {
            return null;
        }
        return FilePathUtils.toRelativePartitionPaths(this.partitionKeys, this.requiredPartitions, this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
    }

    private List<MergeOnReadInputSplit> buildFileIndex() {
        this.fileIndex.setPartitionPaths(getRequiredPartitionPaths());
        List<String> orBuildPartitionPaths = this.fileIndex.getOrBuildPartitionPaths();
        if (orBuildPartitionPaths.size() == 0) {
            return Collections.emptyList();
        }
        FileStatus[] filesInPartitions = this.fileIndex.getFilesInPartitions();
        if (filesInPartitions.length == 0) {
            throw new HoodieException("No files found for reading in user provided path.");
        }
        HoodieTableFileSystemView hoodieTableFileSystemView = new HoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(), filesInPartitions);
        String timestamp = hoodieTableFileSystemView.getLastInstant().get().getTimestamp();
        String string = this.conf.getString(FlinkOptions.MERGE_TYPE);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        return (List) orBuildPartitionPaths.stream().map(str -> {
            return (List) hoodieTableFileSystemView.getLatestMergedFileSlicesBeforeOrOn(str, timestamp).map(fileSlice -> {
                return new MergeOnReadInputSplit(atomicInteger.getAndAdd(1), (String) fileSlice.getBaseFile().map((v0) -> {
                    return v0.getPath();
                }).orElse(null), Option.ofNullable(fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(hoodieLogFile -> {
                    return hoodieLogFile.getPath().toString();
                }).collect(Collectors.toList())), timestamp, this.metaClient.getBasePath(), this.maxCompactionMemoryInBytes, string, null, fileSlice.getFileId());
            }).collect(Collectors.toList());
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    public InputFormat<RowData, ?> getInputFormat() {
        return getInputFormat(false);
    }

    @VisibleForTesting
    public InputFormat<RowData, ?> getInputFormat(boolean z) {
        return z ? getStreamInputFormat() : getBatchInputFormat();
    }

    private InputFormat<RowData, ?> getBatchInputFormat() {
        Schema tableAvroSchema = getTableAvroSchema();
        DataType convertToDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
        RowType rowType = (RowType) convertToDataType.getLogicalType();
        RowType rowType2 = (RowType) getProducedDataType().notNull().getLogicalType();
        String string = this.conf.getString(FlinkOptions.QUERY_TYPE);
        boolean z = -1;
        switch (string.hashCode()) {
            case -683877954:
                if (string.equals(FlinkOptions.QUERY_TYPE_READ_OPTIMIZED)) {
                    z = true;
                    break;
                }
                break;
            case 284874180:
                if (string.equals(FlinkOptions.QUERY_TYPE_SNAPSHOT)) {
                    z = false;
                    break;
                }
                break;
            case 1085372378:
                if (string.equals(FlinkOptions.QUERY_TYPE_INCREMENTAL)) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                switch (HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE))) {
                    case MERGE_ON_READ:
                        List<MergeOnReadInputSplit> buildFileIndex = buildFileIndex();
                        if (buildFileIndex.size() != 0) {
                            return mergeOnReadInputFormat(rowType, rowType2, tableAvroSchema, convertToDataType, buildFileIndex, false);
                        }
                        LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead");
                        return InputFormats.EMPTY_INPUT_FORMAT;
                    case COPY_ON_WRITE:
                        return baseFileOnlyInputFormat();
                    default:
                        throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE));
                }
            case true:
                return baseFileOnlyInputFormat();
            case true:
                IncrementalInputSplits.Result inputSplits = IncrementalInputSplits.builder().conf(this.conf).path(FilePathUtils.toFlinkPath(this.path)).rowType(this.tableRowType).maxCompactionMemoryInBytes(this.maxCompactionMemoryInBytes).requiredPartitions(getRequiredPartitionPaths()).build().inputSplits(this.metaClient, this.hadoopConf);
                if (!inputSplits.isEmpty()) {
                    return mergeOnReadInputFormat(rowType, rowType2, tableAvroSchema, convertToDataType, inputSplits.getInputSplits(), false);
                }
                LOG.warn("No input splits generate for incremental read, returns empty collection instead");
                return InputFormats.EMPTY_INPUT_FORMAT;
            default:
                throw new HoodieException(String.format("Invalid query type : '%s', options ['%s', '%s', '%s'] are supported now", string, FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, FlinkOptions.QUERY_TYPE_INCREMENTAL));
        }
    }

    private InputFormat<RowData, ?> getStreamInputFormat() {
        Schema inferSchemaFromDdl = (this.metaClient == null || !tableDataExists()) ? inferSchemaFromDdl() : getTableAvroSchema();
        DataType convertToDataType = AvroSchemaConverter.convertToDataType(inferSchemaFromDdl);
        RowType rowType = (RowType) convertToDataType.getLogicalType();
        RowType rowType2 = (RowType) getProducedDataType().notNull().getLogicalType();
        String string = this.conf.getString(FlinkOptions.QUERY_TYPE);
        if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(string)) {
            return mergeOnReadInputFormat(rowType, rowType2, inferSchemaFromDdl, convertToDataType, Collections.emptyList(), HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)) == HoodieTableType.MERGE_ON_READ);
        }
        throw new HoodieException(String.format("Invalid query type : '%s', options ['%s'] are supported now", string, FlinkOptions.QUERY_TYPE_SNAPSHOT));
    }

    private boolean tableDataExists() {
        return this.metaClient.getActiveTimeline().getLastCommitMetadataWithValidData().isPresent();
    }

    private MergeOnReadInputFormat mergeOnReadInputFormat(RowType rowType, RowType rowType2, Schema schema, DataType dataType, List<MergeOnReadInputSplit> list, boolean z) {
        return MergeOnReadInputFormat.builder().config(this.conf).tableState(new MergeOnReadTableState(rowType, rowType2, schema.toString(), AvroSchemaConverter.convertToSchema(rowType2).toString(), list, this.conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","))).fieldTypes(dataType.getChildren()).defaultPartName(this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)).limit(this.limit).emitDelete(z).build();
    }

    private InputFormat<RowData, ?> baseFileOnlyInputFormat() {
        FileStatus[] readFiles = getReadFiles();
        if (readFiles.length == 0) {
            return InputFormats.EMPTY_INPUT_FORMAT;
        }
        return new CopyOnWriteInputFormat(FilePathUtils.toFlinkPaths((Path[]) new HoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(), readFiles).getLatestBaseFiles().map((v0) -> {
            return v0.getFileStatus();
        }).map((v0) -> {
            return v0.getPath();
        }).toArray(i -> {
            return new Path[i];
        })), (String[]) this.schema.getColumnNames().toArray(new String[0]), (DataType[]) this.schema.getColumnDataTypes().toArray(new DataType[0]), this.requiredPos, this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), this.limit == -1 ? Long.MAX_VALUE : this.limit, HadoopConfigurations.getParquetConf(this.conf, this.hadoopConf), this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE));
    }

    private Schema inferSchemaFromDdl() {
        return HoodieAvroUtils.addMetadataFields(AvroSchemaConverter.convertToSchema(this.tableRowType), this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
    }

    @VisibleForTesting
    public Schema getTableAvroSchema() {
        try {
            return new TableSchemaResolver(this.metaClient).getTableAvroSchema();
        } catch (Throwable th) {
            LOG.warn("Get table avro schema error, use schema from the DDL instead", th);
            return inferSchemaFromDdl();
        }
    }

    @VisibleForTesting
    public HoodieTableMetaClient getMetaClient() {
        return this.metaClient;
    }

    @VisibleForTesting
    public org.apache.flink.configuration.Configuration getConf() {
        return this.conf;
    }

    @VisibleForTesting
    public void reset() {
        this.metaClient.reloadActiveTimeline();
        this.requiredPartitions = null;
        this.fileIndex.reset();
    }

    @VisibleForTesting
    public FileStatus[] getReadFiles() {
        this.fileIndex.setPartitionPaths(getRequiredPartitionPaths());
        return this.fileIndex.getOrBuildPartitionPaths().size() == 0 ? new FileStatus[0] : this.fileIndex.getFilesInPartitions();
    }
}
