package org.apache.beam.sdk.io.iceberg;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/iceberg/WatchForSnapshots.class */
public class WatchForSnapshots extends PTransform<PBegin, PCollection<KV<String, List<SnapshotInfo>>>> {
    private static final Logger LOG = LoggerFactory.getLogger(WatchForSnapshots.class);
    private final Duration pollInterval;
    private final IcebergScanConfig scanConfig;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/iceberg/WatchForSnapshots$PersistSnapshotProgress.class */
    public static class PersistSnapshotProgress extends DoFn<KV<String, List<SnapshotInfo>>, KV<String, List<SnapshotInfo>>> {
        private final Gauge latestSnapshot = Metrics.gauge(SnapshotPollFn.class, "latestSnapshot");
        private final Counter snapshotsObserved = Metrics.counter(SnapshotPollFn.class, "snapshotsObserved");

        @DoFn.StateId("latestObservedSnapshotId")
        private final StateSpec<ValueState<Long>> latestObservedSnapshotId = StateSpecs.value();

        PersistSnapshotProgress() {
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element KV<String, List<SnapshotInfo>> kv, @DoFn.StateId("latestObservedSnapshotId") @DoFn.AlwaysFetched ValueState<Long> valueState, DoFn.OutputReceiver<KV<String, List<SnapshotInfo>>> outputReceiver) {
            List list = (List) kv.getValue();
            Long l = (Long) valueState.read();
            if (l != null) {
                int i = 0;
                int i2 = 0;
                while (true) {
                    if (i2 >= list.size()) {
                        break;
                    }
                    if (((SnapshotInfo) list.get(i2)).getSnapshotId() == l.longValue()) {
                        i = i2 + 1;
                        break;
                    }
                    i2++;
                }
                if (i > 0) {
                    list = list.subList(i, list.size());
                }
            }
            SnapshotInfo snapshotInfo = (SnapshotInfo) Iterables.getLast(list);
            outputReceiver.output(KV.of((String) kv.getKey(), list));
            WatchForSnapshots.LOG.info("New poll fetched {} snapshots: {}. Checkpointing at snapshot {} of timestamp {}.", new Object[]{Integer.valueOf(list.size()), list.stream().map((v0) -> {
                return v0.getSnapshotId();
            }).collect(Collectors.toList()), Long.valueOf(snapshotInfo.getSnapshotId()), Long.valueOf(snapshotInfo.getTimestampMillis())});
            valueState.write(Long.valueOf(snapshotInfo.getSnapshotId()));
            this.latestSnapshot.set(snapshotInfo.getSnapshotId());
            this.snapshotsObserved.inc(list.size());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/iceberg/WatchForSnapshots$SnapshotPollFn.class */
    public static class SnapshotPollFn extends Watch.Growth.PollFn<String, List<SnapshotInfo>> {
        private final IcebergScanConfig scanConfig;
        private Long fromSnapshotId;
        boolean isCacheSetup = false;

        SnapshotPollFn(IcebergScanConfig icebergScanConfig) {
            this.scanConfig = icebergScanConfig;
        }

        public Watch.Growth.PollResult<List<SnapshotInfo>> apply(String str, Contextful.Fn.Context context) {
            if (!this.isCacheSetup) {
                TableCache.setup(this.scanConfig);
                this.isCacheSetup = true;
            }
            Table refreshed = TableCache.getRefreshed(str);
            Long toSnapshot = ReadUtils.getToSnapshot(refreshed, this.scanConfig);
            boolean z = toSnapshot != null;
            if (this.fromSnapshotId == null) {
                this.fromSnapshotId = ReadUtils.getFromSnapshotExclusive(refreshed, this.scanConfig);
            }
            Snapshot currentSnapshot = refreshed.currentSnapshot();
            if (currentSnapshot == null || Objects.equal(Long.valueOf(currentSnapshot.snapshotId()), this.fromSnapshotId)) {
                return getPollResult(null, z);
            }
            Long valueOf = Long.valueOf(currentSnapshot.snapshotId());
            List<SnapshotInfo> snapshotsBetween = ReadUtils.snapshotsBetween(refreshed, str, this.fromSnapshotId, ((Long) MoreObjects.firstNonNull(toSnapshot, valueOf)).longValue());
            this.fromSnapshotId = valueOf;
            return getPollResult(snapshotsBetween, z);
        }

        private Watch.Growth.PollResult<List<SnapshotInfo>> getPollResult(List<SnapshotInfo> list, boolean z) {
            ImmutableList.Builder builder = ImmutableList.builder();
            if (list != null) {
                builder.add(TimestampedValue.of(list, Instant.ofEpochMilli(list.get(0).getTimestampMillis())));
            }
            return z ? Watch.Growth.PollResult.complete(builder.build()) : Watch.Growth.PollResult.incomplete(builder.build());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WatchForSnapshots(IcebergScanConfig icebergScanConfig, Duration duration) {
        this.pollInterval = duration;
        this.scanConfig = icebergScanConfig;
    }

    public PCollection<KV<String, List<SnapshotInfo>>> expand(PBegin pBegin) {
        return pBegin.apply(Create.of(this.scanConfig.getTableIdentifier(), new String[0])).apply("Scan Table Snapshots", Watch.growthOf(new SnapshotPollFn(this.scanConfig)).withPollInterval(this.pollInterval).withOutputCoder(ListCoder.of(SnapshotInfo.getCoder()))).apply("Persist Snapshot Progress", ParDo.of(new PersistSnapshotProgress()));
    }
}
