package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkReadConf;
import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.assigner.OrderedSplitAssignerFactory;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator;
import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
import org.apache.iceberg.flink.source.reader.MetaDataReaderFunction;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:org/apache/iceberg/flink/source/IcebergSource.class */
public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class);
    private final TableLoader tableLoader;
    private final ScanContext scanContext;
    private final ReaderFunction<T> readerFunction;
    private final SplitAssignerFactory assignerFactory;
    private final SerializableComparator<IcebergSourceSplit> splitComparator;
    private final SerializableRecordEmitter<T> emitter;
    private final String tableName;

    /* loaded from: input_file:org/apache/iceberg/flink/source/IcebergSource$Builder.class */
    public static class Builder<T> {
        private TableLoader tableLoader;
        private Table table;
        private SplitAssignerFactory splitAssignerFactory;
        private SerializableComparator<IcebergSourceSplit> splitComparator;
        private ReaderFunction<T> readerFunction;
        private TableSchema projectedFlinkSchema;
        private Boolean exposeLocality;
        private ReadableConfig flinkConfig = new Configuration();
        private final ScanContext.Builder contextBuilder = ScanContext.builder();
        private final Map<String, String> readOptions = Maps.newHashMap();

        Builder() {
        }

        public Builder<T> tableLoader(TableLoader tableLoader) {
            this.tableLoader = tableLoader;
            return this;
        }

        public Builder<T> table(Table table) {
            this.table = table;
            return this;
        }

        public Builder<T> assignerFactory(SplitAssignerFactory splitAssignerFactory) {
            this.splitAssignerFactory = splitAssignerFactory;
            return this;
        }

        public Builder<T> splitComparator(SerializableComparator<IcebergSourceSplit> serializableComparator) {
            this.splitComparator = serializableComparator;
            return this;
        }

        public Builder<T> readerFunction(ReaderFunction<T> readerFunction) {
            this.readerFunction = readerFunction;
            return this;
        }

        public Builder<T> flinkConfig(ReadableConfig readableConfig) {
            this.flinkConfig = readableConfig;
            return this;
        }

        public Builder<T> caseSensitive(boolean z) {
            this.readOptions.put(FlinkReadOptions.CASE_SENSITIVE, Boolean.toString(z));
            return this;
        }

        public Builder<T> useSnapshotId(Long l) {
            if (l != null) {
                this.readOptions.put(FlinkReadOptions.SNAPSHOT_ID.key(), Long.toString(l.longValue()));
            }
            return this;
        }

        public Builder<T> streamingStartingStrategy(StreamingStartingStrategy streamingStartingStrategy) {
            this.readOptions.put(FlinkReadOptions.STARTING_STRATEGY, streamingStartingStrategy.name());
            return this;
        }

        public Builder<T> startSnapshotTimestamp(Long l) {
            if (l != null) {
                this.readOptions.put(FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.key(), Long.toString(l.longValue()));
            }
            return this;
        }

        public Builder<T> startSnapshotId(Long l) {
            if (l != null) {
                this.readOptions.put(FlinkReadOptions.START_SNAPSHOT_ID.key(), Long.toString(l.longValue()));
            }
            return this;
        }

        public Builder<T> tag(String str) {
            this.readOptions.put(FlinkReadOptions.TAG.key(), str);
            return this;
        }

        public Builder<T> branch(String str) {
            this.readOptions.put(FlinkReadOptions.BRANCH.key(), str);
            return this;
        }

        public Builder<T> startTag(String str) {
            this.readOptions.put(FlinkReadOptions.START_TAG.key(), str);
            return this;
        }

        public Builder<T> endTag(String str) {
            this.readOptions.put(FlinkReadOptions.END_TAG.key(), str);
            return this;
        }

        public Builder<T> endSnapshotId(Long l) {
            if (l != null) {
                this.readOptions.put(FlinkReadOptions.END_SNAPSHOT_ID.key(), Long.toString(l.longValue()));
            }
            return this;
        }

        public Builder<T> asOfTimestamp(Long l) {
            if (l != null) {
                this.readOptions.put(FlinkReadOptions.AS_OF_TIMESTAMP.key(), Long.toString(l.longValue()));
            }
            return this;
        }

        public Builder<T> splitSize(Long l) {
            if (l != null) {
                this.readOptions.put(FlinkReadOptions.SPLIT_SIZE, Long.toString(l.longValue()));
            }
            return this;
        }

        public Builder<T> splitLookback(Integer num) {
            if (num != null) {
                this.readOptions.put(FlinkReadOptions.SPLIT_LOOKBACK, Integer.toString(num.intValue()));
            }
            return this;
        }

        public Builder<T> splitOpenFileCost(Long l) {
            if (l != null) {
                this.readOptions.put(FlinkReadOptions.SPLIT_FILE_OPEN_COST, Long.toString(l.longValue()));
            }
            return this;
        }

        public Builder<T> streaming(boolean z) {
            this.readOptions.put(FlinkReadOptions.STREAMING, Boolean.toString(z));
            return this;
        }

        public Builder<T> monitorInterval(Duration duration) {
            if (duration != null) {
                this.readOptions.put(FlinkReadOptions.MONITOR_INTERVAL, duration.toNanos() + " ns");
            }
            return this;
        }

        public Builder<T> nameMapping(String str) {
            this.readOptions.put("schema.name-mapping.default", str);
            return this;
        }

        public Builder<T> project(Schema schema) {
            this.contextBuilder.project(schema);
            return this;
        }

        public Builder<T> project(TableSchema tableSchema) {
            this.projectedFlinkSchema = tableSchema;
            return this;
        }

        public Builder<T> filters(List<Expression> list) {
            this.contextBuilder.filters(list);
            return this;
        }

        public Builder<T> limit(Long l) {
            if (l != null) {
                this.readOptions.put(FlinkReadOptions.LIMIT, Long.toString(l.longValue()));
            }
            return this;
        }

        public Builder<T> includeColumnStats(boolean z) {
            this.readOptions.put(FlinkReadOptions.INCLUDE_COLUMN_STATS, Boolean.toString(z));
            return this;
        }

        public Builder<T> planParallelism(int i) {
            this.readOptions.put(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.key(), Integer.toString(i));
            return this;
        }

        public Builder<T> exposeLocality(boolean z) {
            this.exposeLocality = Boolean.valueOf(z);
            return this;
        }

        public Builder<T> maxAllowedPlanningFailures(int i) {
            this.readOptions.put(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.key(), Integer.toString(i));
            return this;
        }

        public Builder<T> set(String str, String str2) {
            this.readOptions.put(str, str2);
            return this;
        }

        public Builder<T> setAll(Map<String, String> map) {
            this.readOptions.putAll(map);
            return this;
        }

        public Builder<T> watermarkColumn(String str) {
            Preconditions.checkArgument(this.splitAssignerFactory == null, "Watermark column and SplitAssigner should not be set in the same source");
            this.readOptions.put(FlinkReadOptions.WATERMARK_COLUMN, str);
            return this;
        }

        public Builder<T> watermarkColumnTimeUnit(TimeUnit timeUnit) {
            this.readOptions.put(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT, timeUnit.name());
            return this;
        }

        @Deprecated
        public Builder<T> properties(Map<String, String> map) {
            this.readOptions.putAll(map);
            return this;
        }

        public IcebergSource<T> build() {
            if (this.table == null) {
                try {
                    TableLoader tableLoader = this.tableLoader;
                    Throwable th = null;
                    try {
                        tableLoader.open();
                        this.table = this.tableLoader.loadTable();
                        if (tableLoader != null) {
                            if (0 != 0) {
                                try {
                                    tableLoader.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                tableLoader.close();
                            }
                        }
                    } finally {
                    }
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
            this.contextBuilder.resolveConfig(this.table, this.readOptions, this.flinkConfig);
            Schema schema = this.table.schema();
            if (this.projectedFlinkSchema != null) {
                this.contextBuilder.project(FlinkSchemaUtil.convert(schema, this.projectedFlinkSchema));
            }
            SerializableRecordEmitter defaultEmitter = SerializableRecordEmitter.defaultEmitter();
            FlinkReadConf flinkReadConf = new FlinkReadConf(this.table, this.readOptions, this.flinkConfig);
            String watermarkColumn = flinkReadConf.watermarkColumn();
            TimeUnit watermarkColumnTimeUnit = flinkReadConf.watermarkColumnTimeUnit();
            if (watermarkColumn != null) {
                this.contextBuilder.includeColumnStats(Sets.newHashSet(new String[]{watermarkColumn}));
                ColumnStatsWatermarkExtractor columnStatsWatermarkExtractor = new ColumnStatsWatermarkExtractor(schema, watermarkColumn, watermarkColumnTimeUnit);
                defaultEmitter = SerializableRecordEmitter.emitterWithWatermark(columnStatsWatermarkExtractor);
                this.splitAssignerFactory = new OrderedSplitAssignerFactory(SplitComparators.watermark(columnStatsWatermarkExtractor));
            }
            ScanContext build = this.contextBuilder.build();
            if (this.readerFunction == null) {
                if (this.table instanceof BaseMetadataTable) {
                    this.readerFunction = new MetaDataReaderFunction(this.flinkConfig, this.table.schema(), build.project(), this.table.io(), this.table.encryption());
                } else {
                    this.readerFunction = new RowDataReaderFunction(this.flinkConfig, this.table.schema(), build.project(), build.nameMapping(), build.caseSensitive(), this.table.io(), this.table.encryption(), build.filters());
                }
            }
            if (this.splitAssignerFactory == null) {
                if (this.splitComparator == null) {
                    this.splitAssignerFactory = new SimpleSplitAssignerFactory();
                } else {
                    this.splitAssignerFactory = new OrderedSplitAssignerFactory(this.splitComparator);
                }
            }
            return new IcebergSource<>(this.tableLoader, build, this.readerFunction, this.splitAssignerFactory, this.splitComparator, this.table, defaultEmitter);
        }
    }

    IcebergSource(TableLoader tableLoader, ScanContext scanContext, ReaderFunction<T> readerFunction, SplitAssignerFactory splitAssignerFactory, SerializableComparator<IcebergSourceSplit> serializableComparator, Table table, SerializableRecordEmitter<T> serializableRecordEmitter) {
        Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
        Preconditions.checkNotNull(readerFunction, "readerFunction is required.");
        Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory is required.");
        Preconditions.checkNotNull(table, "table is required.");
        this.tableLoader = tableLoader;
        this.scanContext = scanContext;
        this.readerFunction = readerFunction;
        this.assignerFactory = splitAssignerFactory;
        this.splitComparator = serializableComparator;
        this.emitter = serializableRecordEmitter;
        this.tableName = table.name();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String name() {
        return "IcebergSource-" + this.tableName;
    }

    private String planningThreadName() {
        return this.tableName + "-" + UUID.randomUUID();
    }

    private List<IcebergSourceSplit> planSplitsForBatch(String str) {
        ExecutorService newWorkerPool = ThreadPools.newWorkerPool(str, this.scanContext.planParallelism().intValue());
        try {
            try {
                TableLoader m33clone = this.tableLoader.m33clone();
                Throwable th = null;
                try {
                    try {
                        m33clone.open();
                        List<IcebergSourceSplit> planIcebergSourceSplits = FlinkSplitPlanner.planIcebergSourceSplits(m33clone.loadTable(), this.scanContext, newWorkerPool);
                        LOG.info("Discovered {} splits from table {} during job initialization", Integer.valueOf(planIcebergSourceSplits.size()), this.tableName);
                        if (m33clone != null) {
                            $closeResource(null, m33clone);
                        }
                        return planIcebergSourceSplits;
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (m33clone != null) {
                        $closeResource(th, m33clone);
                    }
                    throw th2;
                }
            } catch (IOException e) {
                throw new UncheckedIOException("Failed to close table loader", e);
            }
        } finally {
            newWorkerPool.shutdown();
        }
    }

    public Boundedness getBoundedness() {
        return this.scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
    }

    public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext sourceReaderContext) {
        return new IcebergSourceReader(this.emitter, new IcebergSourceReaderMetrics(sourceReaderContext.metricGroup(), this.tableName), this.readerFunction, this.splitComparator, sourceReaderContext);
    }

    public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(SplitEnumeratorContext<IcebergSourceSplit> splitEnumeratorContext) {
        return createEnumerator(splitEnumeratorContext, null);
    }

    public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> restoreEnumerator(SplitEnumeratorContext<IcebergSourceSplit> splitEnumeratorContext, IcebergEnumeratorState icebergEnumeratorState) {
        return createEnumerator(splitEnumeratorContext, icebergEnumeratorState);
    }

    public SimpleVersionedSerializer<IcebergSourceSplit> getSplitSerializer() {
        return new IcebergSourceSplitSerializer(this.scanContext.caseSensitive());
    }

    public SimpleVersionedSerializer<IcebergEnumeratorState> getEnumeratorCheckpointSerializer() {
        return new IcebergEnumeratorStateSerializer(this.scanContext.caseSensitive());
    }

    private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(SplitEnumeratorContext<IcebergSourceSplit> splitEnumeratorContext, @Nullable IcebergEnumeratorState icebergEnumeratorState) {
        SplitAssigner createAssigner;
        if (icebergEnumeratorState == null) {
            createAssigner = this.assignerFactory.createAssigner();
        } else {
            LOG.info("Iceberg source restored {} splits from state for table {}", Integer.valueOf(icebergEnumeratorState.pendingSplits().size()), this.tableName);
            createAssigner = this.assignerFactory.createAssigner(icebergEnumeratorState.pendingSplits());
        }
        if (this.scanContext.isStreaming()) {
            return new ContinuousIcebergEnumerator(splitEnumeratorContext, createAssigner, this.scanContext, new ContinuousSplitPlannerImpl(this.tableLoader, this.scanContext, planningThreadName()), icebergEnumeratorState);
        }
        createAssigner.onDiscoveredSplits(planSplitsForBatch(planningThreadName()));
        return new StaticIcebergEnumerator(splitEnumeratorContext, createAssigner);
    }

    public static <T> Builder<T> builder() {
        return new Builder<>();
    }

    public static Builder<RowData> forRowData() {
        return new Builder<>();
    }

    public /* bridge */ /* synthetic */ SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Object obj) throws Exception {
        return restoreEnumerator((SplitEnumeratorContext<IcebergSourceSplit>) splitEnumeratorContext, (IcebergEnumeratorState) obj);
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
