package org.apache.iceberg.flink.maintenance.operator;

import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.operator.SingleThreadedIteratorSource;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/iceberg/flink/maintenance/operator/MonitorSource.class */
public class MonitorSource extends SingleThreadedIteratorSource<TableChange> {
    private static final Logger LOG = LoggerFactory.getLogger(MonitorSource.class);
    private final TableLoader tableLoader;
    private final RateLimiterStrategy rateLimiterStrategy;
    private final long maxReadBack;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/iceberg/flink/maintenance/operator/MonitorSource$TableChangeIterator.class */
    public static class TableChangeIterator implements Iterator<TableChange> {
        private Long lastSnapshotId;
        private final long maxReadBack;
        private final Table table;

        TableChangeIterator(TableLoader tableLoader, Long l, long j) {
            this.lastSnapshotId = l;
            this.maxReadBack = j;
            tableLoader.open();
            this.table = tableLoader.loadTable();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return true;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public TableChange next() {
            try {
                this.table.refresh();
                Snapshot currentSnapshot = this.table.currentSnapshot();
                Long valueOf = currentSnapshot != null ? Long.valueOf(currentSnapshot.snapshotId()) : null;
                Long l = valueOf;
                TableChange empty = TableChange.empty();
                long j = 0;
                while (l != null && !l.equals(this.lastSnapshotId)) {
                    long j2 = j + 1;
                    j = j2;
                    if (j2 > this.maxReadBack) {
                        break;
                    }
                    Snapshot snapshot = this.table.snapshot(l.longValue());
                    if (snapshot != null) {
                        if (DataOperations.REPLACE.equals(snapshot.operation())) {
                            MonitorSource.LOG.debug("Skipping replace snapshot {}", Long.valueOf(snapshot.snapshotId()));
                        } else {
                            MonitorSource.LOG.debug("Reading snapshot {}", Long.valueOf(snapshot.snapshotId()));
                            empty.merge(new TableChange(snapshot, this.table.io()));
                        }
                        l = snapshot.parentId();
                    } else {
                        l = null;
                    }
                }
                this.lastSnapshotId = valueOf;
                return empty;
            } catch (Exception e) {
                MonitorSource.LOG.warn("Failed to fetch table changes for {}", this.table, e);
                return TableChange.empty();
            }
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("lastSnapshotId", this.lastSnapshotId).add("maxReadBack", this.maxReadBack).add("table", this.table).toString();
        }
    }

    /* loaded from: input_file:org/apache/iceberg/flink/maintenance/operator/MonitorSource$TableChangeIteratorSerializer.class */
    private static final class TableChangeIteratorSerializer implements SimpleVersionedSerializer<Iterator<TableChange>> {
        private static final int CURRENT_VERSION = 1;
        private final TableLoader tableLoader;
        private final long maxReadBack;

        TableChangeIteratorSerializer(TableLoader tableLoader, long j) {
            this.tableLoader = tableLoader;
            this.maxReadBack = j;
        }

        public int getVersion() {
            return 1;
        }

        public byte[] serialize(Iterator<TableChange> it) throws IOException {
            Preconditions.checkArgument(it instanceof TableChangeIterator, "Use TableChangeIterator iterator. Found incompatible type: %s", it.getClass());
            TableChangeIterator tableChangeIterator = (TableChangeIterator) it;
            DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(8);
            dataOutputSerializer.writeLong(tableChangeIterator.lastSnapshotId != null ? tableChangeIterator.lastSnapshotId.longValue() : -1L);
            return dataOutputSerializer.getCopyOfBuffer();
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public TableChangeIterator m621deserialize(int i, byte[] bArr) throws IOException {
            if (i != 1) {
                throw new IOException("Unrecognized version or corrupt state: " + i);
            }
            long readLong = new DataInputDeserializer(bArr).readLong();
            return new TableChangeIterator(this.tableLoader, readLong != -1 ? Long.valueOf(readLong) : null, this.maxReadBack);
        }
    }

    public MonitorSource(TableLoader tableLoader, RateLimiterStrategy rateLimiterStrategy, long j) {
        Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
        Preconditions.checkNotNull(rateLimiterStrategy, "Rate limiter strategy should no be null");
        Preconditions.checkArgument(j > 0, "Need to read at least 1 snapshot to work");
        this.tableLoader = tableLoader;
        this.rateLimiterStrategy = rateLimiterStrategy;
        this.maxReadBack = j;
    }

    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }

    public TypeInformation<TableChange> getProducedType() {
        return TypeInformation.of(TableChange.class);
    }

    @Override // org.apache.iceberg.flink.maintenance.operator.SingleThreadedIteratorSource
    Iterator<TableChange> createIterator() {
        return new TableChangeIterator(this.tableLoader, null, this.maxReadBack);
    }

    @Override // org.apache.iceberg.flink.maintenance.operator.SingleThreadedIteratorSource
    SimpleVersionedSerializer<Iterator<TableChange>> iteratorSerializer() {
        return new TableChangeIteratorSerializer(this.tableLoader, this.maxReadBack);
    }

    @Override // org.apache.iceberg.flink.maintenance.operator.SingleThreadedIteratorSource
    public SourceReader<TableChange, SingleThreadedIteratorSource.GlobalSplit<TableChange>> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        return new RateLimitedSourceReader(super.createReader(sourceReaderContext), this.rateLimiterStrategy.createRateLimiter(1));
    }
}
