package org.apache.beam.sdk.io.iceberg;

import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Duration;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/iceberg/WriteToDestinations.class */
public class WriteToDestinations extends PTransform<PCollection<KV<String, Row>>, IcebergWriteResult> {
    private static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
    public static final int FILE_TRIGGERING_BYTE_COUNT = 1073741824;
    static final int DEFAULT_NUM_FILE_SHARDS = 0;
    private final IcebergCatalogConfig catalogConfig;
    private final DynamicDestinations dynamicDestinations;
    private final Duration triggeringFrequency;

    /* JADX INFO: Access modifiers changed from: package-private */
    public WriteToDestinations(IcebergCatalogConfig icebergCatalogConfig, DynamicDestinations dynamicDestinations, Duration duration) {
        this.dynamicDestinations = dynamicDestinations;
        this.catalogConfig = icebergCatalogConfig;
        this.triggeringFrequency = duration;
    }

    public IcebergWriteResult expand(PCollection<KV<String, Row>> pCollection) {
        return new IcebergWriteResult(pCollection.getPipeline(), (pCollection.isBounded().equals(PCollection.IsBounded.UNBOUNDED) ? writeTriggered(pCollection) : writeUntriggered(pCollection)).apply(new AppendFilesToTables(this.catalogConfig)));
    }

    private PCollection<FileWriteResult> writeTriggered(PCollection<KV<String, Row>> pCollection) {
        Preconditions.checkArgumentNotNull(this.triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
        return pCollection.apply("WindowIntoGlobal", Window.into(new GlobalWindows())).apply(GroupIntoBatches.ofSize(500000L).withByteSize(1073741824L).withMaxBufferingDuration((Duration) Preconditions.checkArgumentNotNull(this.triggeringFrequency)).withShardedKey()).setCoder(KvCoder.of(ShardedKey.Coder.of(StringUtf8Coder.of()), IterableCoder.of(RowCoder.of(this.dynamicDestinations.getDataSchema())))).apply("WriteGroupedRows", new WriteGroupedRowsToFiles(this.catalogConfig, this.dynamicDestinations)).apply("ApplyUserTrigger", Window.into(new GlobalWindows()).triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf((Duration) Preconditions.checkArgumentNotNull(this.triggeringFrequency)))).discardingFiredPanes());
    }

    private PCollection<FileWriteResult> writeUntriggered(PCollection<KV<String, Row>> pCollection) {
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(this.triggeringFrequency == null, "Triggering frequency is only applicable for streaming pipelines.");
        WriteUngroupedRowsToFiles.Result result = (WriteUngroupedRowsToFiles.Result) pCollection.apply("Fast-path write rows", new WriteUngroupedRowsToFiles(this.catalogConfig, this.dynamicDestinations));
        return PCollectionList.of(result.getWrittenFiles()).and(result.getSpilledRows().apply("Group spilled rows by destination shard", GroupByKey.create()).apply("Write remaining rows to files", new WriteGroupedRowsToFiles(this.catalogConfig, this.dynamicDestinations))).apply("Flatten Written Files", Flatten.pCollections());
    }
}
