package org.apache.iceberg.spark.source;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MicroBatches;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.source.SparkBatchScan;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/spark/source/SparkMicroBatchStream.class */
public class SparkMicroBatchStream implements MicroBatchStream {
    private static final Joiner SLASH = Joiner.on("/");
    private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class);
    private final Table table;
    private final boolean caseSensitive;
    private final String expectedSchema;
    private final Broadcast<Table> tableBroadcast;
    private final Long splitSize;
    private final Integer splitLookback;
    private final Long splitOpenFileCost;
    private final boolean localityPreferred;
    private final StreamingOffset initialOffset;
    private final boolean skipDelete;
    private final Long fromTimestamp;

    /* loaded from: input_file:org/apache/iceberg/spark/source/SparkMicroBatchStream$InitialOffsetStore.class */
    private static class InitialOffsetStore {
        private final Table table;

        /* renamed from: io, reason: collision with root package name */
        private final FileIO f22io;
        private final String initialOffsetLocation;
        private final Long fromTimestamp;

        InitialOffsetStore(Table table, String str, Long l) {
            this.table = table;
            this.f22io = table.io();
            this.initialOffsetLocation = SparkMicroBatchStream.SLASH.join(str, "offsets/0", new Object[0]);
            this.fromTimestamp = l;
        }

        public StreamingOffset initialOffset() {
            InputFile newInputFile = this.f22io.newInputFile(this.initialOffsetLocation);
            if (newInputFile.exists()) {
                return readOffset(newInputFile);
            }
            this.table.refresh();
            StreamingOffset determineStartingOffset = SparkMicroBatchStream.determineStartingOffset(this.table, this.fromTimestamp);
            writeOffset(determineStartingOffset, this.f22io.newOutputFile(this.initialOffsetLocation));
            return determineStartingOffset;
        }

        private void writeOffset(StreamingOffset streamingOffset, OutputFile outputFile) {
            try {
                PositionOutputStream create = outputFile.create();
                Throwable th = null;
                try {
                    try {
                        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(create, StandardCharsets.UTF_8));
                        bufferedWriter.write(streamingOffset.json());
                        bufferedWriter.flush();
                        if (create != null) {
                            if (0 != 0) {
                                try {
                                    create.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                create.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new UncheckedIOException(String.format("Failed writing offset to: %s", this.initialOffsetLocation), e);
            }
        }

        private StreamingOffset readOffset(InputFile inputFile) {
            try {
                SeekableInputStream newStream = inputFile.newStream();
                Throwable th = null;
                try {
                    try {
                        StreamingOffset fromJson = StreamingOffset.fromJson(newStream);
                        if (newStream != null) {
                            if (0 != 0) {
                                try {
                                    newStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                newStream.close();
                            }
                        }
                        return fromJson;
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new UncheckedIOException(String.format("Failed reading offset from: %s", this.initialOffsetLocation), e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SparkMicroBatchStream(JavaSparkContext javaSparkContext, Table table, SparkReadConf sparkReadConf, boolean z, Schema schema, String str) {
        this.table = table;
        this.caseSensitive = z;
        this.expectedSchema = SchemaParser.toJson(schema);
        this.localityPreferred = sparkReadConf.localityEnabled();
        this.tableBroadcast = javaSparkContext.broadcast(SerializableTable.copyOf(table));
        this.splitSize = Long.valueOf(sparkReadConf.splitSize());
        this.splitLookback = Integer.valueOf(sparkReadConf.splitLookback());
        this.splitOpenFileCost = Long.valueOf(sparkReadConf.splitOpenFileCost());
        this.fromTimestamp = sparkReadConf.streamFromTimestamp();
        this.initialOffset = new InitialOffsetStore(table, str, this.fromTimestamp).initialOffset();
        this.skipDelete = sparkReadConf.streamingSkipDeleteSnapshots();
    }

    public Offset latestOffset() {
        this.table.refresh();
        if (this.table.currentSnapshot() != null && this.table.currentSnapshot().timestampMillis() >= this.fromTimestamp.longValue()) {
            return new StreamingOffset(this.table.currentSnapshot().snapshotId(), Iterables.size(r0.addedFiles()), false);
        }
        return StreamingOffset.START_OFFSET;
    }

    public InputPartition[] planInputPartitions(Offset offset, Offset offset2) {
        Preconditions.checkArgument(offset2 instanceof StreamingOffset, "Invalid end offset: %s is not a StreamingOffset", offset2);
        Preconditions.checkArgument(offset instanceof StreamingOffset, "Invalid start offset: %s is not a StreamingOffset", offset);
        if (offset2.equals(StreamingOffset.START_OFFSET)) {
            return new InputPartition[0];
        }
        ArrayList newArrayList = Lists.newArrayList(TableScanUtil.planTasks(TableScanUtil.splitFiles(CloseableIterable.withNoopClose((Iterable) planFiles((StreamingOffset) offset, (StreamingOffset) offset2)), this.splitSize.longValue()), this.splitSize.longValue(), this.splitLookback.intValue(), this.splitOpenFileCost.longValue()));
        InputPartition[] inputPartitionArr = new InputPartition[newArrayList.size()];
        Tasks.range(inputPartitionArr.length).stopOnFailure().executeWith(this.localityPreferred ? ThreadPools.getWorkerPool() : null).run(num -> {
            inputPartitionArr[num.intValue()] = new SparkBatchScan.ReadTask((CombinedScanTask) newArrayList.get(num.intValue()), this.tableBroadcast, this.expectedSchema, this.caseSensitive, this.localityPreferred);
        });
        return inputPartitionArr;
    }

    public PartitionReaderFactory createReaderFactory() {
        return new SparkBatchScan.ReaderFactory(0);
    }

    public Offset initialOffset() {
        return this.initialOffset;
    }

    public Offset deserializeOffset(String str) {
        return StreamingOffset.fromJson(str);
    }

    public void commit(Offset offset) {
    }

    public void stop() {
    }

    private List<FileScanTask> planFiles(StreamingOffset streamingOffset, StreamingOffset streamingOffset2) {
        ArrayList newArrayList = Lists.newArrayList();
        StreamingOffset determineStartingOffset = StreamingOffset.START_OFFSET.equals(streamingOffset) ? determineStartingOffset(this.table, this.fromTimestamp) : streamingOffset;
        StreamingOffset streamingOffset3 = null;
        do {
            streamingOffset3 = streamingOffset3 == null ? determineStartingOffset : new StreamingOffset(SnapshotUtil.snapshotAfter(this.table, streamingOffset3.snapshotId()).snapshotId(), 0L, false);
            if (shouldProcess(this.table.snapshot(streamingOffset3.snapshotId()))) {
                newArrayList.addAll(MicroBatches.from(this.table.snapshot(streamingOffset3.snapshotId()), this.table.io()).caseSensitive(this.caseSensitive).specsById(this.table.specs()).generate(streamingOffset3.position(), Long.MAX_VALUE, streamingOffset3.shouldScanAllFiles()).tasks());
            } else {
                LOG.debug("Skipping snapshot: {} of table {}", Long.valueOf(streamingOffset3.snapshotId()), this.table.name());
            }
        } while (streamingOffset3.snapshotId() != streamingOffset2.snapshotId());
        return newArrayList;
    }

    private boolean shouldProcess(Snapshot snapshot) {
        String operation = snapshot.operation();
        Preconditions.checkState(!operation.equals(DataOperations.DELETE) || this.skipDelete, "Cannot process delete snapshot: %s, to ignore deletes, set %s=true.", snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
        Preconditions.checkState(operation.equals(DataOperations.DELETE) || operation.equals(DataOperations.APPEND) || operation.equals(DataOperations.REPLACE), "Cannot process %s snapshot: %s", operation.toLowerCase(Locale.ROOT), snapshot.snapshotId());
        return operation.equals(DataOperations.APPEND);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static StreamingOffset determineStartingOffset(Table table, Long l) {
        if (table.currentSnapshot() == null) {
            return StreamingOffset.START_OFFSET;
        }
        if (l == null) {
            return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0L, false);
        }
        if (table.currentSnapshot().timestampMillis() < l.longValue()) {
            return StreamingOffset.START_OFFSET;
        }
        try {
            Snapshot oldestAncestorAfter = SnapshotUtil.oldestAncestorAfter(table, l.longValue());
            return oldestAncestorAfter != null ? new StreamingOffset(oldestAncestorAfter.snapshotId(), 0L, false) : StreamingOffset.START_OFFSET;
        } catch (IllegalStateException e) {
            return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0L, false);
        }
    }
}
