package org.apache.beam.sdk.io.iceberg;

import java.util.Iterator;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/iceberg/AppendFilesToTables.class */
class AppendFilesToTables extends PTransform<PCollection<FileWriteResult>, PCollection<KV<String, SnapshotInfo>>> {
    private static final Logger LOG = LoggerFactory.getLogger(AppendFilesToTables.class);
    private final IcebergCatalogConfig catalogConfig;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/iceberg/AppendFilesToTables$AppendFilesToTablesDoFn.class */
    public static class AppendFilesToTablesDoFn extends DoFn<KV<String, Iterable<FileWriteResult>>, KV<String, SnapshotInfo>> {
        private final Counter snapshotsCreated;
        private final IcebergCatalogConfig catalogConfig;
        private transient Catalog catalog;

        private AppendFilesToTablesDoFn(IcebergCatalogConfig icebergCatalogConfig) {
            this.snapshotsCreated = Metrics.counter(AppendFilesToTables.class, "snapshotsCreated");
            this.catalogConfig = icebergCatalogConfig;
        }

        private Catalog getCatalog() {
            if (this.catalog == null) {
                this.catalog = this.catalogConfig.catalog();
            }
            return this.catalog;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<String, Iterable<FileWriteResult>> kv, DoFn.OutputReceiver<KV<String, SnapshotInfo>> outputReceiver, BoundedWindow boundedWindow) {
            if (((Iterable) kv.getValue()).iterator().hasNext()) {
                Table loadTable = getCatalog().loadTable(TableIdentifier.parse((String) kv.getKey()));
                AppendFiles newAppend = loadTable.newAppend();
                Iterator it = ((Iterable) kv.getValue()).iterator();
                while (it.hasNext()) {
                    newAppend.appendManifest(((FileWriteResult) it.next()).getManifestFile());
                }
                newAppend.commit();
                Snapshot currentSnapshot = loadTable.currentSnapshot();
                AppendFilesToTables.LOG.info("Created new snapshot for table '{}': {}", kv.getKey(), currentSnapshot);
                this.snapshotsCreated.inc();
                outputReceiver.outputWithTimestamp(KV.of((String) kv.getKey(), SnapshotInfo.fromSnapshot(currentSnapshot)), boundedWindow.maxTimestamp());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AppendFilesToTables(IcebergCatalogConfig icebergCatalogConfig) {
        this.catalogConfig = icebergCatalogConfig;
    }

    public PCollection<KV<String, SnapshotInfo>> expand(PCollection<FileWriteResult> pCollection) {
        return pCollection.apply("Key metadata updates by table", WithKeys.of(new SerializableFunction<FileWriteResult, String>() { // from class: org.apache.beam.sdk.io.iceberg.AppendFilesToTables.1
            public String apply(FileWriteResult fileWriteResult) {
                return fileWriteResult.getTableIdentifier().toString();
            }
        })).apply("Group metadata updates by table", GroupByKey.create()).apply("Append metadata updates to tables", ParDo.of(new AppendFilesToTablesDoFn(this.catalogConfig))).setCoder(KvCoder.of(StringUtf8Coder.of(), SnapshotInfo.CODER));
    }
}
