package org.apache.paimon.flink.source;

import java.time.Duration;
import java.util.List;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.PaimonDataStreamScanProvider;
import org.apache.paimon.flink.log.LogSourceProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Projection;

/* loaded from: input_file:org/apache/paimon/flink/source/DataTableSource.class */
public class DataTableSource extends FlinkTableSource implements LookupTableSource, SupportsWatermarkPushDown {
    private final ObjectIdentifier tableIdentifier;
    private final boolean streaming;
    private final DynamicTableFactory.Context context;

    @Nullable
    private final LogStoreTableFactory logStoreTableFactory;

    @Nullable
    private WatermarkStrategy<RowData> watermarkStrategy;

    public DataTableSource(ObjectIdentifier objectIdentifier, Table table, boolean z, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory) {
        this(objectIdentifier, table, z, context, logStoreTableFactory, null, (int[][]) null, null, null);
    }

    private DataTableSource(ObjectIdentifier objectIdentifier, Table table, boolean z, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory, @Nullable Predicate predicate, @Nullable int[][] iArr, @Nullable Long l, @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
        super(table, predicate, iArr, l);
        this.tableIdentifier = objectIdentifier;
        this.streaming = z;
        this.context = context;
        this.logStoreTableFactory = logStoreTableFactory;
        this.predicate = predicate;
        this.projectFields = iArr;
        this.limit = l;
        this.watermarkStrategy = watermarkStrategy;
    }

    public ChangelogMode getChangelogMode() {
        if (this.streaming && !(this.table instanceof AppendOnlyFileStoreTable)) {
            if (this.table instanceof ChangelogValueCountFileStoreTable) {
                return ChangelogMode.all();
            }
            if (!(this.table instanceof ChangelogWithKeyFileStoreTable)) {
                throw new UnsupportedOperationException("Unsupported Table subclass " + this.table.getClass().getName() + " for streaming mode.");
            }
            Options fromMap = Options.fromMap(this.table.options());
            return ((Boolean) fromMap.get(CoreOptions.LOG_SCAN_REMOVE_NORMALIZE)).booleanValue() ? ChangelogMode.all() : (this.logStoreTableFactory != null || fromMap.get(CoreOptions.CHANGELOG_PRODUCER) == CoreOptions.ChangelogProducer.NONE) ? (fromMap.get(CoreOptions.LOG_CONSISTENCY) == CoreOptions.LogConsistency.TRANSACTIONAL && fromMap.get(CoreOptions.LOG_CHANGELOG_MODE) == CoreOptions.LogChangelogMode.ALL) ? ChangelogMode.all() : ChangelogMode.upsert() : ChangelogMode.all();
        }
        return ChangelogMode.insertOnly();
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        LogSourceProvider logSourceProvider = null;
        if (this.logStoreTableFactory != null) {
            logSourceProvider = this.logStoreTableFactory.createSourceProvider(this.context, scanContext, this.projectFields);
        }
        WatermarkStrategy<RowData> watermarkStrategy = this.watermarkStrategy;
        Options fromMap = Options.fromMap(this.table.options());
        if (watermarkStrategy != null) {
            if (((FlinkConnectorOptions.WatermarkEmitStrategy) fromMap.get(FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY)) == FlinkConnectorOptions.WatermarkEmitStrategy.ON_EVENT) {
                watermarkStrategy = new OnEventWatermarkStrategy(watermarkStrategy);
            }
            Duration duration = (Duration) fromMap.get(FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT);
            if (duration != null) {
                watermarkStrategy = watermarkStrategy.withIdleness(duration);
            }
            String str = (String) fromMap.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP);
            if (str != null) {
                try {
                    watermarkStrategy = WatermarkAlignUtils.withWatermarkAlignment(watermarkStrategy, str, (Duration) fromMap.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT), (Duration) fromMap.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL));
                } catch (NoSuchMethodError e) {
                    throw new RuntimeException("Flink 1.14 does not support watermark alignment, please check your Flink version.", e);
                }
            }
        }
        FlinkSourceBuilder withWatermarkStrategy = new FlinkSourceBuilder(this.tableIdentifier, this.table).withContinuousMode(this.streaming).withLogSourceProvider(logSourceProvider).withProjection(this.projectFields).withPredicate(this.predicate).withLimit(this.limit).withWatermarkStrategy(watermarkStrategy);
        return new PaimonDataStreamScanProvider(!this.streaming, streamExecutionEnvironment -> {
            return configureSource(withWatermarkStrategy, streamExecutionEnvironment);
        });
    }

    private DataStream<RowData> configureSource(FlinkSourceBuilder flinkSourceBuilder, StreamExecutionEnvironment streamExecutionEnvironment) {
        Options fromMap = Options.fromMap(this.table.options());
        Integer num = (Integer) fromMap.get(FlinkConnectorOptions.SCAN_PARALLELISM);
        List<Split> list = null;
        if (((Boolean) fromMap.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)).booleanValue()) {
            if (this.streaming) {
                num = (Integer) fromMap.get(CoreOptions.BUCKET);
            } else {
                list = this.table.newReadBuilder().withFilter(this.predicate).newScan().plan().splits();
                if (null != list) {
                    num = Integer.valueOf(list.size());
                }
                if (null != this.limit && this.limit.longValue() > 0) {
                    num = Integer.valueOf(Math.min(num.intValue(), this.limit.longValue() >= 2147483647L ? Integer.MAX_VALUE : this.limit.intValue()));
                }
                num = Integer.valueOf(null == num ? 1 : Math.max(1, num.intValue()));
            }
        }
        return flinkSourceBuilder.withParallelism(num).withSplits(list).withEnv(streamExecutionEnvironment).build();
    }

    public DynamicTableSource copy() {
        return new DataTableSource(this.tableIdentifier, this.table, this.streaming, this.context, this.logStoreTableFactory, this.predicate, this.projectFields, this.limit, this.watermarkStrategy);
    }

    public String asSummaryString() {
        return "Paimon-DataSource";
    }

    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
    }

    public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext lookupContext) {
        if (this.limit != null) {
            throw new RuntimeException("Limit push down should not happen in Lookup source, but it is " + this.limit);
        }
        return LookupRuntimeProviderFactory.create(new FileStoreLookupFunction(this.table, this.projectFields == null ? IntStream.range(0, this.table.rowType().getFieldCount()).toArray() : Projection.of(this.projectFields).toTopLevelIndexes(), Projection.of(lookupContext.getKeys()).toTopLevelIndexes(), this.predicate));
    }
}
