/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MicroBatches;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.spark.source.SparkInputPartition;
import org.apache.iceberg.spark.source.SparkPlanningUtil;
import org.apache.iceberg.spark.source.SparkRowReaderFactory;
import org.apache.iceberg.spark.source.StreamingOffset;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkMicroBatchStream
implements MicroBatchStream,
SupportsAdmissionControl {
    private static final Joiner SLASH = Joiner.on((String)"/");
    private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class);
    private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = Types.StructType.of((Types.NestedField[])new Types.NestedField[0]);
    private final Table table;
    private final String branch;
    private final boolean caseSensitive;
    private final String expectedSchema;
    private final Broadcast<Table> tableBroadcast;
    private final long splitSize;
    private final int splitLookback;
    private final long splitOpenFileCost;
    private final boolean localityPreferred;
    private final StreamingOffset initialOffset;
    private final boolean skipDelete;
    private final boolean skipOverwrite;
    private final long fromTimestamp;
    private final int maxFilesPerMicroBatch;
    private final int maxRecordsPerMicroBatch;

    SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, SparkReadConf readConf, Schema expectedSchema, String checkpointLocation) {
        this.table = table;
        this.branch = readConf.branch();
        this.caseSensitive = readConf.caseSensitive();
        this.expectedSchema = SchemaParser.toJson((Schema)expectedSchema);
        this.localityPreferred = readConf.localityEnabled();
        this.tableBroadcast = sparkContext.broadcast((Object)SerializableTableWithSize.copyOf(table));
        this.splitSize = readConf.splitSize();
        this.splitLookback = readConf.splitLookback();
        this.splitOpenFileCost = readConf.splitOpenFileCost();
        this.fromTimestamp = readConf.streamFromTimestamp();
        this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch();
        this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch();
        InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, checkpointLocation, this.fromTimestamp);
        this.initialOffset = initialOffsetStore.initialOffset();
        this.skipDelete = readConf.streamingSkipDeleteSnapshots();
        this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots();
    }

    public Offset latestOffset() {
        this.table.refresh();
        if (this.table.currentSnapshot() == null) {
            return StreamingOffset.START_OFFSET;
        }
        if (this.table.currentSnapshot().timestampMillis() < this.fromTimestamp) {
            return StreamingOffset.START_OFFSET;
        }
        Snapshot latestSnapshot = this.table.currentSnapshot();
        return new StreamingOffset(latestSnapshot.snapshotId(), this.addedFilesCount(latestSnapshot), false);
    }

    public InputPartition[] planInputPartitions(Offset start, Offset end) {
        Preconditions.checkArgument((boolean)(end instanceof StreamingOffset), (String)"Invalid end offset: %s is not a StreamingOffset", (Object)end);
        Preconditions.checkArgument((boolean)(start instanceof StreamingOffset), (String)"Invalid start offset: %s is not a StreamingOffset", (Object)start);
        if (end.equals((Object)StreamingOffset.START_OFFSET)) {
            return new InputPartition[0];
        }
        StreamingOffset endOffset = (StreamingOffset)end;
        StreamingOffset startOffset = (StreamingOffset)start;
        List<FileScanTask> fileScanTasks = this.planFiles(startOffset, endOffset);
        CloseableIterable splitTasks = TableScanUtil.splitFiles((CloseableIterable)CloseableIterable.withNoopClose(fileScanTasks), (long)this.splitSize);
        ArrayList combinedScanTasks = Lists.newArrayList((Iterable)TableScanUtil.planTasks((CloseableIterable)splitTasks, (long)this.splitSize, (int)this.splitLookback, (long)this.splitOpenFileCost));
        String[][] locations = this.computePreferredLocations(combinedScanTasks);
        InputPartition[] partitions = new InputPartition[combinedScanTasks.size()];
        for (int index = 0; index < combinedScanTasks.size(); ++index) {
            partitions[index] = new SparkInputPartition(EMPTY_GROUPING_KEY_TYPE, (ScanTaskGroup)combinedScanTasks.get(index), this.tableBroadcast, this.branch, this.expectedSchema, this.caseSensitive, locations != null ? locations[index] : SparkPlanningUtil.NO_LOCATION_PREFERENCE);
        }
        return partitions;
    }

    private String[][] computePreferredLocations(List<CombinedScanTask> taskGroups) {
        return this.localityPreferred ? SparkPlanningUtil.fetchBlockLocations(this.table.io(), taskGroups) : null;
    }

    public PartitionReaderFactory createReaderFactory() {
        return new SparkRowReaderFactory();
    }

    public Offset initialOffset() {
        return this.initialOffset;
    }

    public Offset deserializeOffset(String json) {
        return StreamingOffset.fromJson(json);
    }

    public void commit(Offset end) {
    }

    public void stop() {
    }

    private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffset endOffset) {
        ArrayList fileScanTasks = Lists.newArrayList();
        StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals((Object)startOffset) ? SparkMicroBatchStream.determineStartingOffset(this.table, this.fromTimestamp) : startOffset;
        StreamingOffset currentOffset = null;
        do {
            if (currentOffset == null) {
                currentOffset = batchStartOffset;
            } else {
                Snapshot snapshotAfter = SnapshotUtil.snapshotAfter((Table)this.table, (long)currentOffset.snapshotId());
                currentOffset = currentOffset.snapshotId() != endOffset.snapshotId() ? new StreamingOffset(snapshotAfter.snapshotId(), 0L, false) : endOffset;
            }
            Snapshot snapshot = this.table.snapshot(currentOffset.snapshotId());
            this.validateCurrentSnapshotExists(snapshot, currentOffset);
            if (!this.shouldProcess(snapshot)) {
                LOG.debug("Skipping snapshot: {} of table {}", (Object)currentOffset.snapshotId(), (Object)this.table.name());
                continue;
            }
            Snapshot currentSnapshot = this.table.snapshot(currentOffset.snapshotId());
            long endFileIndex = currentOffset.snapshotId() == endOffset.snapshotId() ? endOffset.position() : this.addedFilesCount(currentSnapshot);
            MicroBatches.MicroBatch latestMicroBatch = MicroBatches.from((Snapshot)currentSnapshot, (FileIO)this.table.io()).caseSensitive(this.caseSensitive).specsById(this.table.specs()).generate(currentOffset.position(), endFileIndex, Long.MAX_VALUE, currentOffset.shouldScanAllFiles());
            fileScanTasks.addAll(latestMicroBatch.tasks());
        } while (currentOffset.snapshotId() != endOffset.snapshotId());
        return fileScanTasks;
    }

    private boolean shouldProcess(Snapshot snapshot) {
        String op;
        switch (op = snapshot.operation()) {
            case "append": {
                return true;
            }
            case "replace": {
                return false;
            }
            case "delete": {
                Preconditions.checkState((boolean)this.skipDelete, (String)"Cannot process delete snapshot: %s, to ignore deletes, set %s=true", (long)snapshot.snapshotId(), (Object)"streaming-skip-delete-snapshots");
                return false;
            }
            case "overwrite": {
                Preconditions.checkState((boolean)this.skipOverwrite, (String)"Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=true", (long)snapshot.snapshotId(), (Object)"streaming-skip-overwrite-snapshots");
                return false;
            }
        }
        throw new IllegalStateException(String.format("Cannot process unknown snapshot operation: %s (snapshot id %s)", op.toLowerCase(Locale.ROOT), snapshot.snapshotId()));
    }

    private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp) {
        if (table.currentSnapshot() == null) {
            return StreamingOffset.START_OFFSET;
        }
        if (fromTimestamp == null) {
            return new StreamingOffset(SnapshotUtil.oldestAncestor((Table)table).snapshotId(), 0L, false);
        }
        if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
            return StreamingOffset.START_OFFSET;
        }
        try {
            Snapshot snapshot = SnapshotUtil.oldestAncestorAfter((Table)table, (long)fromTimestamp);
            if (snapshot != null) {
                return new StreamingOffset(snapshot.snapshotId(), 0L, false);
            }
            return StreamingOffset.START_OFFSET;
        }
        catch (IllegalStateException e) {
            return new StreamingOffset(SnapshotUtil.oldestAncestor((Table)table).snapshotId(), 0L, false);
        }
    }

    public Offset latestOffset(Offset startOffset, ReadLimit limit) {
        StreamingOffset latestStreamingOffset;
        Preconditions.checkArgument((boolean)(startOffset instanceof StreamingOffset), (String)"Invalid start offset: %s is not a StreamingOffset", (Object)startOffset);
        this.table.refresh();
        if (this.table.currentSnapshot() == null) {
            return StreamingOffset.START_OFFSET;
        }
        if (this.table.currentSnapshot().timestampMillis() < this.fromTimestamp) {
            return StreamingOffset.START_OFFSET;
        }
        StreamingOffset startingOffset = (StreamingOffset)startOffset;
        if (startOffset.equals((Object)StreamingOffset.START_OFFSET)) {
            startingOffset = SparkMicroBatchStream.determineStartingOffset(this.table, this.fromTimestamp);
        }
        Snapshot curSnapshot = this.table.snapshot(startingOffset.snapshotId());
        this.validateCurrentSnapshotExists(curSnapshot, startingOffset);
        int startPosOfSnapOffset = (int)startingOffset.position();
        boolean scanAllFiles = startingOffset.shouldScanAllFiles();
        boolean shouldContinueReading = true;
        int curFilesAdded = 0;
        int curRecordCount = 0;
        int curPos = 0;
        while (shouldContinueReading) {
            List indexedManifests = MicroBatches.skippedManifestIndexesFromSnapshot((FileIO)this.table.io(), (Snapshot)curSnapshot, (long)startPosOfSnapOffset, (boolean)scanAllFiles);
            block13: for (int idx = 0; idx < indexedManifests.size() && shouldContinueReading; ++idx) {
                curPos = (Integer)((Pair)indexedManifests.get(idx)).second();
                try (CloseableIterable taskIterable = MicroBatches.openManifestFile((FileIO)this.table.io(), (Map)this.table.specs(), (boolean)this.caseSensitive, (Snapshot)curSnapshot, (ManifestFile)((ManifestFile)((Pair)indexedManifests.get(idx)).first()), (boolean)scanAllFiles);
                     CloseableIterator taskIter = taskIterable.iterator();){
                    while (taskIter.hasNext()) {
                        FileScanTask task = (FileScanTask)taskIter.next();
                        if (curPos >= startPosOfSnapOffset) {
                            if (curFilesAdded + 1 > this.maxFilesPerMicroBatch || (long)curRecordCount + ((DataFile)task.file()).recordCount() > (long)this.maxRecordsPerMicroBatch) {
                                shouldContinueReading = false;
                                continue block13;
                            }
                            ++curFilesAdded;
                            curRecordCount = (int)((long)curRecordCount + ((DataFile)task.file()).recordCount());
                        }
                        ++curPos;
                    }
                    continue;
                }
                catch (IOException ioe) {
                    LOG.warn("Failed to close task iterable", (Throwable)ioe);
                }
            }
            if (curSnapshot.snapshotId() == this.table.currentSnapshot().snapshotId()) break;
            if (!shouldContinueReading) continue;
            startPosOfSnapOffset = -1;
            curSnapshot = SnapshotUtil.snapshotAfter((Table)this.table, (long)curSnapshot.snapshotId());
            scanAllFiles = false;
        }
        return (latestStreamingOffset = new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles)).equals((Object)startingOffset) ? null : latestStreamingOffset;
    }

    private long addedFilesCount(Snapshot snapshot) {
        long addedFilesCount = PropertyUtil.propertyAsLong((Map)snapshot.summary(), (String)"added-data-files", (long)-1L);
        return addedFilesCount == -1L ? (long)Iterables.size((Iterable)snapshot.addedDataFiles(this.table.io())) : addedFilesCount;
    }

    private void validateCurrentSnapshotExists(Snapshot snapshot, StreamingOffset currentOffset) {
        if (snapshot == null) {
            throw new IllegalStateException(String.format("Cannot load current offset at snapshot %d, the snapshot was expired or removed", currentOffset.snapshotId()));
        }
    }

    public ReadLimit getDefaultReadLimit() {
        if (this.maxFilesPerMicroBatch != Integer.MAX_VALUE && this.maxRecordsPerMicroBatch != Integer.MAX_VALUE) {
            ReadLimit[] readLimits = new ReadLimit[]{ReadLimit.maxFiles((int)this.maxFilesPerMicroBatch), ReadLimit.maxRows((long)this.maxFilesPerMicroBatch)};
            return ReadLimit.compositeLimit((ReadLimit[])readLimits);
        }
        if (this.maxFilesPerMicroBatch != Integer.MAX_VALUE) {
            return ReadLimit.maxFiles((int)this.maxFilesPerMicroBatch);
        }
        if (this.maxRecordsPerMicroBatch != Integer.MAX_VALUE) {
            return ReadLimit.maxRows((long)this.maxRecordsPerMicroBatch);
        }
        return ReadLimit.allAvailable();
    }

    private static class InitialOffsetStore {
        private final Table table;
        private final FileIO io;
        private final String initialOffsetLocation;
        private final Long fromTimestamp;

        InitialOffsetStore(Table table, String checkpointLocation, Long fromTimestamp) {
            this.table = table;
            this.io = table.io();
            this.initialOffsetLocation = SLASH.join((Object)checkpointLocation, (Object)"offsets/0", new Object[0]);
            this.fromTimestamp = fromTimestamp;
        }

        public StreamingOffset initialOffset() {
            InputFile inputFile = this.io.newInputFile(this.initialOffsetLocation);
            if (inputFile.exists()) {
                return this.readOffset(inputFile);
            }
            this.table.refresh();
            StreamingOffset offset = SparkMicroBatchStream.determineStartingOffset(this.table, this.fromTimestamp);
            OutputFile outputFile = this.io.newOutputFile(this.initialOffsetLocation);
            this.writeOffset(offset, outputFile);
            return offset;
        }

        private void writeOffset(StreamingOffset offset, OutputFile file) {
            try (PositionOutputStream outputStream = file.create();){
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter((OutputStream)outputStream, StandardCharsets.UTF_8));
                writer.write(offset.json());
                writer.flush();
            }
            catch (IOException ioException) {
                throw new UncheckedIOException(String.format("Failed writing offset to: %s", this.initialOffsetLocation), ioException);
            }
        }

        private StreamingOffset readOffset(InputFile file) {
            StreamingOffset streamingOffset;
            block8: {
                SeekableInputStream in = file.newStream();
                try {
                    streamingOffset = StreamingOffset.fromJson((InputStream)in);
                    if (in == null) break block8;
                }
                catch (Throwable throwable) {
                    try {
                        if (in != null) {
                            try {
                                in.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        throw throwable;
                    }
                    catch (IOException ioException) {
                        throw new UncheckedIOException(String.format("Failed reading offset from: %s", this.initialOffsetLocation), ioException);
                    }
                }
                in.close();
            }
            return streamingOffset;
        }
    }
}

