package org.apache.beam.sdk.io.iceberg;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/iceberg/AppendFilesToTables.class */
class AppendFilesToTables extends PTransform<PCollection<FileWriteResult>, PCollection<KV<String, SnapshotInfo>>> {
    private static final Logger LOG = LoggerFactory.getLogger(AppendFilesToTables.class);
    private final IcebergCatalogConfig catalogConfig;
    private final String manifestFilePrefix;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/iceberg/AppendFilesToTables$AppendFilesToTablesDoFn.class */
    public static class AppendFilesToTablesDoFn extends DoFn<KV<String, Iterable<FileWriteResult>>, KV<String, SnapshotInfo>> {
        private final Counter snapshotsCreated;
        private final Distribution committedDataFileByteSize;
        private final Distribution committedDataFileRecordCount;
        private final IcebergCatalogConfig catalogConfig;
        private final String manifestFilePrefix;
        private transient Catalog catalog;

        private AppendFilesToTablesDoFn(IcebergCatalogConfig icebergCatalogConfig, String str) {
            this.snapshotsCreated = Metrics.counter(AppendFilesToTables.class, "snapshotsCreated");
            this.committedDataFileByteSize = Metrics.distribution(RecordWriter.class, "committedDataFileByteSize");
            this.committedDataFileRecordCount = Metrics.distribution(RecordWriter.class, "committedDataFileRecordCount");
            this.catalogConfig = icebergCatalogConfig;
            this.manifestFilePrefix = str;
        }

        private Catalog getCatalog() {
            if (this.catalog == null) {
                this.catalog = this.catalogConfig.catalog();
            }
            return this.catalog;
        }

        private boolean containsMultiplePartitionSpecs(Iterable<FileWriteResult> iterable) {
            int partitionSpecId = iterable.iterator().next().getSerializableDataFile().getPartitionSpecId();
            Iterator<FileWriteResult> it = iterable.iterator();
            while (it.hasNext()) {
                if (partitionSpecId != it.next().getSerializableDataFile().getPartitionSpecId()) {
                    return true;
                }
            }
            return false;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<String, Iterable<FileWriteResult>> kv, DoFn.OutputReceiver<KV<String, SnapshotInfo>> outputReceiver, BoundedWindow boundedWindow) throws IOException {
            String str = (String) kv.getKey();
            Table loadTable = getCatalog().loadTable(TableIdentifier.parse((String) kv.getKey()));
            Iterable<FileWriteResult> iterable = (Iterable) kv.getValue();
            if (shouldSkip(loadTable, iterable)) {
                return;
            }
            if (containsMultiplePartitionSpecs(iterable)) {
                appendManifestFiles(loadTable, iterable);
            } else {
                appendDataFiles(loadTable, iterable);
            }
            Snapshot currentSnapshot = loadTable.currentSnapshot();
            AppendFilesToTables.LOG.info("Created new snapshot for table '{}': {}", str, currentSnapshot);
            this.snapshotsCreated.inc();
            outputReceiver.outputWithTimestamp(KV.of((String) kv.getKey(), SnapshotInfo.fromSnapshot(currentSnapshot)), boundedWindow.maxTimestamp());
        }

        private void appendDataFiles(Table table, Iterable<FileWriteResult> iterable) {
            AppendFiles newAppend = table.newAppend();
            Iterator<FileWriteResult> it = iterable.iterator();
            while (it.hasNext()) {
                DataFile dataFile = it.next().getDataFile(table.specs());
                newAppend.appendFile(dataFile);
                this.committedDataFileByteSize.update(dataFile.fileSizeInBytes());
                this.committedDataFileRecordCount.update(dataFile.recordCount());
            }
            newAppend.commit();
        }

        private void appendManifestFiles(Table table, Iterable<FileWriteResult> iterable) throws IOException {
            String uuid = UUID.randomUUID().toString();
            Map<Integer, PartitionSpec> specs = table.specs();
            HashMap hashMap = new HashMap();
            Iterator<FileWriteResult> it = iterable.iterator();
            while (it.hasNext()) {
                DataFile dataFile = it.next().getDataFile(specs);
                ((List) hashMap.computeIfAbsent(Integer.valueOf(dataFile.specId()), num -> {
                    return new ArrayList();
                })).add(dataFile);
            }
            AppendFiles newAppend = table.newAppend();
            for (Map.Entry entry : hashMap.entrySet()) {
                int intValue = ((Integer) entry.getKey()).intValue();
                List<DataFile> list = (List) entry.getValue();
                PartitionSpec partitionSpec = (PartitionSpec) Preconditions.checkStateNotNull(specs.get(Integer.valueOf(intValue)));
                FileIO io = table.io();
                try {
                    ManifestWriter<DataFile> createManifestWriter = createManifestWriter(table.location(), uuid, partitionSpec, io);
                    if (io != null) {
                        io.close();
                    }
                    for (DataFile dataFile2 : list) {
                        createManifestWriter.add(dataFile2);
                        this.committedDataFileByteSize.update(dataFile2.fileSizeInBytes());
                        this.committedDataFileRecordCount.update(dataFile2.recordCount());
                    }
                    createManifestWriter.close();
                    newAppend.appendManifest(createManifestWriter.toManifestFile());
                } catch (Throwable th) {
                    if (io != null) {
                        try {
                            io.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            newAppend.commit();
        }

        private ManifestWriter<DataFile> createManifestWriter(String str, String str2, PartitionSpec partitionSpec, FileIO fileIO) {
            return ManifestFiles.write(partitionSpec, fileIO.newOutputFile(FileFormat.AVRO.addExtension(String.format("%s/metadata/%s-%s-%s.manifest", str, this.manifestFilePrefix, str2, Integer.valueOf(partitionSpec.specId())))));
        }

        private boolean shouldSkip(Table table, Iterable<FileWriteResult> iterable) {
            if (table.currentSnapshot() == null) {
                return false;
            }
            if (!iterable.iterator().hasNext()) {
                return true;
            }
            String charSequence = ((DataFile) table.currentSnapshot().addedDataFiles(table.io()).iterator().next()).path().toString();
            Iterator<FileWriteResult> it = iterable.iterator();
            while (it.hasNext()) {
                if (it.next().getSerializableDataFile().getPath().equals(charSequence)) {
                    return true;
                }
            }
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AppendFilesToTables(IcebergCatalogConfig icebergCatalogConfig, String str) {
        this.catalogConfig = icebergCatalogConfig;
        this.manifestFilePrefix = str;
    }

    public PCollection<KV<String, SnapshotInfo>> expand(PCollection<FileWriteResult> pCollection) {
        return pCollection.apply("Key metadata updates by table", WithKeys.of(new SerializableFunction<FileWriteResult, String>() { // from class: org.apache.beam.sdk.io.iceberg.AppendFilesToTables.1
            public String apply(FileWriteResult fileWriteResult) {
                return fileWriteResult.getTableIdentifier().toString();
            }
        })).apply("Group metadata updates by table", GroupByKey.create()).apply("Append metadata updates to tables", ParDo.of(new AppendFilesToTablesDoFn(this.catalogConfig, this.manifestFilePrefix))).setCoder(KvCoder.of(StringUtf8Coder.of(), SnapshotInfo.getCoder()));
    }
}
