package org.apache.beam.sdk.io;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.AutoValue_WriteFiles;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reify;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf;
import org.checkerframework.dataflow.qual.Pure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(Experimental.Kind.SOURCE_SINK)
@AutoValue
/* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles.class */
public abstract class WriteFiles<UserT, DestinationT, OutputT> extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) WriteFiles.class);

    @Internal
    public static final Class<? extends WriteFiles> CONCRETE_CLASS = AutoValue_WriteFiles.class;
    private static final int DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE = 20;
    private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
    static final int UNKNOWN_SHARDNUM = -1;
    private FileBasedSink.WriteOperation<DestinationT, OutputT> writeOperation;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$ApplyShardingFunctionFn.class */
    public class ApplyShardingFunctionFn extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> {
        private final ShardingFunction<UserT, DestinationT> shardingFn;
        private final PCollectionView<Integer> numShardsView;

        ApplyShardingFunctionFn(ShardingFunction<UserT, DestinationT> shardingFunction, PCollectionView<Integer> pCollectionView) {
            this.numShardsView = pCollectionView;
            this.shardingFn = shardingFunction;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @DoFn.ProcessElement
        public void processElement(DoFn<UserT, KV<ShardedKey<Integer>, UserT>>.ProcessContext processContext) throws Exception {
            int intValue;
            WriteFiles.this.getDynamicDestinations().setSideInputAccessorFromProcessContext(processContext);
            if (this.numShardsView != null) {
                intValue = ((Integer) processContext.sideInput(this.numShardsView)).intValue();
            } else {
                Preconditions.checkNotNull(WriteFiles.this.getNumShardsProvider());
                intValue = WriteFiles.this.getNumShardsProvider().get().intValue();
            }
            Preconditions.checkArgument(intValue > 0, "Must have a positive number of shards specified for non-runner-determined sharding. Got %s", intValue);
            processContext.output(KV.of(this.shardingFn.assignShardKey(WriteFiles.this.getDynamicDestinations().getDestination(processContext.element()), processContext.element(), intValue), processContext.element()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue.Builder
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$Builder.class */
    public static abstract class Builder<UserT, DestinationT, OutputT> {
        abstract Builder<UserT, DestinationT, OutputT> setSink(FileBasedSink<UserT, DestinationT, OutputT> fileBasedSink);

        abstract Builder<UserT, DestinationT, OutputT> setComputeNumShards(PTransform<PCollection<UserT>, PCollectionView<Integer>> pTransform);

        abstract Builder<UserT, DestinationT, OutputT> setNumShardsProvider(ValueProvider<Integer> valueProvider);

        abstract Builder<UserT, DestinationT, OutputT> setWindowedWrites(boolean z);

        abstract Builder<UserT, DestinationT, OutputT> setMaxNumWritersPerBundle(int i);

        abstract Builder<UserT, DestinationT, OutputT> setSideInputs(List<PCollectionView<?>> list);

        abstract Builder<UserT, DestinationT, OutputT> setShardingFunction(ShardingFunction<UserT, DestinationT> shardingFunction);

        abstract WriteFiles<UserT, DestinationT, OutputT> build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$FinalizeTempFileBundles.class */
    public class FinalizeTempFileBundles extends PTransform<PCollection<List<FileBasedSink.FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> {
        private final PCollectionView<Integer> numShardsView;
        private final Coder<DestinationT> destinationCoder;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$FinalizeTempFileBundles$FinalizeFn.class */
        public class FinalizeFn extends DoFn<List<FileBasedSink.FileResult<DestinationT>>, KV<DestinationT, String>> {
            private FinalizeFn() {
            }

            /* JADX WARN: Multi-variable type inference failed */
            @DoFn.ProcessElement
            public void process(DoFn<List<FileBasedSink.FileResult<DestinationT>>, KV<DestinationT, String>>.ProcessContext processContext) throws Exception {
                WriteFiles.this.getDynamicDestinations().setSideInputAccessorFromProcessContext(processContext);
                Integer num = FinalizeTempFileBundles.this.numShardsView != null ? (Integer) processContext.sideInput(FinalizeTempFileBundles.this.numShardsView) : WriteFiles.this.getNumShardsProvider() != null ? WriteFiles.this.getNumShardsProvider().get() : null;
                ArrayList newArrayList = Lists.newArrayList((Iterable) processContext.element());
                WriteFiles.LOG.info("Finalizing {} file results", Integer.valueOf(newArrayList.size()));
                List<KV<FileBasedSink.FileResult<DestinationT>, ResourceId>> finalizeDestination = newArrayList.isEmpty() ? WriteFiles.this.writeOperation.finalizeDestination(WriteFiles.this.getDynamicDestinations().getDefaultDestination(), GlobalWindow.INSTANCE, num, newArrayList) : WriteFiles.this.finalizeAllDestinations(newArrayList, num);
                WriteFiles.this.writeOperation.moveToOutputFiles(finalizeDestination);
                for (KV<FileBasedSink.FileResult<DestinationT>, ResourceId> kv : finalizeDestination) {
                    processContext.output(KV.of(kv.getKey().getDestination(), kv.getValue().toString()));
                }
            }
        }

        private FinalizeTempFileBundles(PCollectionView<Integer> pCollectionView, Coder<DestinationT> coder) {
            this.numShardsView = pCollectionView;
            this.destinationCoder = coder;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public WriteFilesResult<DestinationT> expand(PCollection<List<FileBasedSink.FileResult<DestinationT>>> pCollection) {
            ArrayList newArrayList = Lists.newArrayList(WriteFiles.this.getSideInputs());
            if (this.numShardsView != null) {
                newArrayList.add(this.numShardsView);
            }
            PCollection pCollection2 = (PCollection) ((PCollection) pCollection.apply("Finalize", ParDo.of(new FinalizeFn()).withSideInputs(newArrayList))).setCoder(KvCoder.of(this.destinationCoder, StringUtf8Coder.of())).apply(Reshuffle.viaRandomKey());
            return WriteFilesResult.in(pCollection.getPipeline(), new TupleTag("perDestinationOutputFilenames"), pCollection2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$GatherBundlesPerWindowFn.class */
    public static class GatherBundlesPerWindowFn<T> extends DoFn<T, List<T>> {
        private transient Multimap<BoundedWindow, T> bundles;

        private GatherBundlesPerWindowFn() {
            this.bundles = null;
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.bundles = ArrayListMultimap.create();
        }

        @DoFn.ProcessElement
        public void process(DoFn<T, List<T>>.ProcessContext processContext, BoundedWindow boundedWindow) {
            this.bundles.put(boundedWindow, processContext.element());
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn<T, List<T>>.FinishBundleContext finishBundleContext) throws Exception {
            for (BoundedWindow boundedWindow : this.bundles.keySet()) {
                finishBundleContext.output(Lists.newArrayList(this.bundles.get(boundedWindow)), boundedWindow.maxTimestamp(), boundedWindow);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$GatherResults.class */
    public class GatherResults<ResultT> extends PTransform<PCollection<ResultT>, PCollection<List<ResultT>>> {
        private final Coder<ResultT> resultCoder;

        private GatherResults(Coder<ResultT> coder) {
            this.resultCoder = coder;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PCollection<List<ResultT>> expand(PCollection<ResultT> pCollection) {
            return WriteFiles.this.getWindowedWrites() ? (PCollection) ((PCollection) ((PCollection) ((PCollection) ((PCollection) pCollection.apply("Add void key", WithKeys.of((Void) null))).apply("Reshuffle", Reshuffle.of())).apply("Drop key", Values.create())).apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn()))).setCoder(ListCoder.of(this.resultCoder)).apply(Reshuffle.viaRandomKey()) : (PCollection) pCollection.getPipeline().apply(Reify.viewInGlobalWindow((PCollectionView) pCollection.apply(View.asList()), ListCoder.of(this.resultCoder)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$RandomShardingFunction.class */
    public class RandomShardingFunction implements ShardingFunction<UserT, DestinationT> {
        private final Coder<DestinationT> destinationCoder;
        private int shardNumber = -1;

        RandomShardingFunction(Coder<DestinationT> coder) {
            this.destinationCoder = coder;
        }

        @Override // org.apache.beam.sdk.io.ShardingFunction
        public ShardedKey<Integer> assignShardKey(DestinationT destinationt, UserT usert, int i) throws Exception {
            if (this.shardNumber == -1) {
                this.shardNumber = ThreadLocalRandom.current().nextInt(i);
            } else {
                this.shardNumber = (this.shardNumber + 1) % i;
            }
            return ShardedKey.of(Integer.valueOf(WriteFiles.hashDestination(destinationt, this.destinationCoder)), this.shardNumber);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$WriteShardedBundlesToTempFiles.class */
    public class WriteShardedBundlesToTempFiles extends PTransform<PCollection<UserT>, PCollection<FileBasedSink.FileResult<DestinationT>>> {
        private final Coder<DestinationT> destinationCoder;
        private final Coder<FileBasedSink.FileResult<DestinationT>> fileResultCoder;
        private final PCollectionView<Integer> numShardsView;

        private WriteShardedBundlesToTempFiles(Coder<DestinationT> coder, Coder<FileBasedSink.FileResult<DestinationT>> coder2, PCollectionView<Integer> pCollectionView) {
            this.destinationCoder = coder;
            this.fileResultCoder = coder2;
            this.numShardsView = pCollectionView;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PCollection<FileBasedSink.FileResult<DestinationT>> expand(PCollection<UserT> pCollection) {
            ArrayList newArrayList = Lists.newArrayList(WriteFiles.this.getSideInputs());
            if (this.numShardsView != null) {
                newArrayList.add(this.numShardsView);
            }
            return ((PCollection) ((PCollection) ((PCollection) pCollection.apply("ApplyShardingKey", ParDo.of(new ApplyShardingFunctionFn(WriteFiles.this.getShardingFunction() == null ? new RandomShardingFunction(this.destinationCoder) : WriteFiles.this.getShardingFunction(), this.numShardsView)).withSideInputs(newArrayList))).setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), pCollection.getCoder())).apply("GroupIntoShards", GroupByKey.create())).apply("WriteShardsIntoTempFiles", ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(WriteFiles.this.getSideInputs()))).setCoder(this.fileResultCoder);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$WriteShardsIntoTempFilesFn.class */
    public class WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileBasedSink.FileResult<DestinationT>> {
        private WriteShardsIntoTempFilesFn() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v41, types: [org.apache.beam.sdk.io.FileBasedSink$DynamicDestinations] */
        /* JADX WARN: Type inference failed for: r10v0, types: [org.apache.beam.sdk.transforms.DoFn<org.apache.beam.sdk.values.KV<org.apache.beam.sdk.values.ShardedKey<java.lang.Integer>, java.lang.Iterable<UserT>>, org.apache.beam.sdk.io.FileBasedSink$FileResult<DestinationT>>$ProcessContext, org.apache.beam.sdk.transforms.DoFn$ProcessContext] */
        /* JADX WARN: Type inference failed for: r16v2 */
        /* JADX WARN: Type inference failed for: r16v3 */
        /* JADX WARN: Type inference failed for: r16v4, types: [org.apache.beam.sdk.io.FileBasedSink$Writer, java.lang.Object] */
        /* JADX WARN: Type inference failed for: r1v8, types: [org.apache.beam.sdk.io.FileBasedSink$DynamicDestinations] */
        @DoFn.ProcessElement
        public void processElement(DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileBasedSink.FileResult<DestinationT>>.ProcessContext processContext, BoundedWindow boundedWindow) throws Exception {
            WriteFiles.this.getDynamicDestinations().setSideInputAccessorFromProcessContext(processContext);
            HashMap newHashMap = Maps.newHashMap();
            for (Object obj : (Iterable) ((KV) processContext.element()).getValue()) {
                Object destination = WriteFiles.this.getDynamicDestinations().getDestination(obj);
                FileBasedSink.Writer writer = (FileBasedSink.Writer) newHashMap.get(destination);
                if (writer == 0) {
                    String uuid = UUID.randomUUID().toString();
                    WriteFiles.LOG.info("Opening writer {} for window {} pane {} destination {}", uuid, boundedWindow, processContext.pane(), destination);
                    writer = WriteFiles.this.writeOperation.createWriter();
                    writer.setDestination(destination);
                    writer.open(uuid);
                    newHashMap.put(destination, writer);
                }
                WriteFiles.writeOrClose(writer, WriteFiles.this.getDynamicDestinations().formatRecord(obj));
            }
            for (Map.Entry entry : newHashMap.entrySet()) {
                FileBasedSink.Writer writer2 = (FileBasedSink.Writer) entry.getValue();
                try {
                    writer2.close();
                    int shardNumber = ((ShardedKey) ((KV) processContext.element()).getKey()).getShardNumber();
                    Preconditions.checkArgument(shardNumber != -1, "Shard should have been set, but is unset for element %s", processContext.element());
                    processContext.output(new FileBasedSink.FileResult(writer2.getOutputFile(), shardNumber, boundedWindow, processContext.pane(), entry.getKey()));
                } catch (Exception e) {
                    writer2.cleanup();
                    throw e;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$WriteUnshardedBundlesToTempFiles.class */
    public class WriteUnshardedBundlesToTempFiles extends PTransform<PCollection<UserT>, PCollection<FileBasedSink.FileResult<DestinationT>>> {
        private final Coder<DestinationT> destinationCoder;
        private final Coder<FileBasedSink.FileResult<DestinationT>> fileResultCoder;

        private WriteUnshardedBundlesToTempFiles(Coder<DestinationT> coder, Coder<FileBasedSink.FileResult<DestinationT>> coder2) {
            this.destinationCoder = coder;
            this.fileResultCoder = coder2;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PCollection<FileBasedSink.FileResult<DestinationT>> expand(PCollection<UserT> pCollection) {
            if (WriteFiles.this.getMaxNumWritersPerBundle() < 0) {
                return ((PCollection) pCollection.apply("WritedUnshardedBundles", ParDo.of(new WriteUnshardedTempFilesFn(null, this.destinationCoder)).withSideInputs(WriteFiles.this.getSideInputs()))).setCoder(this.fileResultCoder);
            }
            TupleTag<OutputT> tupleTag = new TupleTag<>("writtenRecords");
            TupleTag tupleTag2 = new TupleTag("unwrittenRecords");
            PCollectionTuple pCollectionTuple = (PCollectionTuple) pCollection.apply("WriteUnshardedBundles", ParDo.of(new WriteUnshardedTempFilesFn(tupleTag2, this.destinationCoder)).withSideInputs(WriteFiles.this.getSideInputs()).withOutputTags(tupleTag, TupleTagList.of((TupleTag<?>) tupleTag2)));
            PCollection coder = pCollectionTuple.get(tupleTag).setCoder(this.fileResultCoder);
            return ((PCollection) PCollectionList.of(coder).and((PCollection) ((PCollection) ((PCollection) pCollectionTuple.get(tupleTag2).setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), pCollection.getCoder())).apply("GroupUnwritten", GroupByKey.create())).apply("WriteUnwritten", ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(WriteFiles.this.getSideInputs()))).setCoder(this.fileResultCoder).apply("DropShardNum", ParDo.of(new DoFn<FileBasedSink.FileResult<DestinationT>, FileBasedSink.FileResult<DestinationT>>() { // from class: org.apache.beam.sdk.io.WriteFiles.WriteUnshardedBundlesToTempFiles.1
                @DoFn.ProcessElement
                public void process(DoFn<FileBasedSink.FileResult<DestinationT>, FileBasedSink.FileResult<DestinationT>>.ProcessContext processContext) {
                    processContext.output(((FileBasedSink.FileResult) processContext.element()).withShard(-1));
                }
            }))).apply(Flatten.pCollections())).setCoder(this.fileResultCoder);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$WriteUnshardedTempFilesFn.class */
    public class WriteUnshardedTempFilesFn extends DoFn<UserT, FileBasedSink.FileResult<DestinationT>> {
        private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag;
        private final Coder<DestinationT> destinationCoder;
        private Map<WriterKey<DestinationT>, FileBasedSink.Writer<DestinationT, OutputT>> writers;
        private int spilledShardNum = -1;

        WriteUnshardedTempFilesFn(TupleTag<KV<ShardedKey<Integer>, UserT>> tupleTag, Coder<DestinationT> coder) {
            this.unwrittenRecordsTag = tupleTag;
            this.destinationCoder = coder;
        }

        @DoFn.StartBundle
        public void startBundle(DoFn<UserT, FileBasedSink.FileResult<DestinationT>>.StartBundleContext startBundleContext) {
            this.writers = Maps.newHashMap();
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r13v2, types: [org.apache.beam.sdk.io.FileBasedSink$Writer, java.lang.Object] */
        @DoFn.ProcessElement
        public void processElement(DoFn<UserT, FileBasedSink.FileResult<DestinationT>>.ProcessContext processContext, BoundedWindow boundedWindow) throws Exception {
            WriteFiles.this.getDynamicDestinations().setSideInputAccessorFromProcessContext(processContext);
            PaneInfo pane = processContext.pane();
            Object destination = WriteFiles.this.getDynamicDestinations().getDestination(processContext.element());
            WriterKey writerKey = new WriterKey(boundedWindow, processContext.pane(), destination);
            FileBasedSink.Writer<DestinationT, OutputT> writer = this.writers.get(writerKey);
            FileBasedSink.Writer<DestinationT, OutputT> writer2 = writer;
            if (writer == null) {
                if (WriteFiles.this.getMaxNumWritersPerBundle() >= 0 && this.writers.size() > WriteFiles.this.getMaxNumWritersPerBundle()) {
                    if (this.spilledShardNum == -1) {
                        this.spilledShardNum = ThreadLocalRandom.current().nextInt(10);
                    } else {
                        this.spilledShardNum = (this.spilledShardNum + 1) % 10;
                    }
                    processContext.output((TupleTag<KV<ShardedKey<Integer>, UserT>>) this.unwrittenRecordsTag, KV.of(ShardedKey.of(Integer.valueOf(WriteFiles.hashDestination(destination, this.destinationCoder)), this.spilledShardNum), processContext.element()));
                    return;
                }
                String uuid = UUID.randomUUID().toString();
                WriteFiles.LOG.info("Opening writer {} for window {} pane {} destination {}", uuid, boundedWindow, pane, destination);
                ?? createWriter = WriteFiles.this.writeOperation.createWriter();
                createWriter.setDestination(destination);
                createWriter.open(uuid);
                this.writers.put(writerKey, createWriter);
                WriteFiles.LOG.debug("Done opening writer");
                writer2 = createWriter;
            }
            WriteFiles.writeOrClose(writer2, WriteFiles.this.getDynamicDestinations().formatRecord(processContext.element()));
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn<UserT, FileBasedSink.FileResult<DestinationT>>.FinishBundleContext finishBundleContext) throws Exception {
            for (Map.Entry<WriterKey<DestinationT>, FileBasedSink.Writer<DestinationT, OutputT>> entry : this.writers.entrySet()) {
                WriterKey<DestinationT> key = entry.getKey();
                FileBasedSink.Writer<DestinationT, OutputT> value = entry.getValue();
                try {
                    value.close();
                    BoundedWindow boundedWindow = ((WriterKey) key).window;
                    finishBundleContext.output(new FileBasedSink.FileResult(value.getOutputFile(), -1, boundedWindow, ((WriterKey) key).paneInfo, ((WriterKey) key).destination), boundedWindow.maxTimestamp(), boundedWindow);
                } catch (Exception e) {
                    value.cleanup();
                    throw e;
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$WriterKey.class */
    private static class WriterKey<DestinationT> {
        private final BoundedWindow window;
        private final PaneInfo paneInfo;
        private final DestinationT destination;

        WriterKey(BoundedWindow boundedWindow, PaneInfo paneInfo, DestinationT destinationt) {
            this.window = boundedWindow;
            this.paneInfo = paneInfo;
            this.destination = destinationt;
        }

        @EnsuresNonNullIf(expression = {"#1"}, result = true)
        @Pure
        public boolean equals(Object obj) {
            if (!(obj instanceof WriterKey)) {
                return false;
            }
            WriterKey writerKey = (WriterKey) obj;
            return Objects.equal(this.window, writerKey.window) && Objects.equal(this.paneInfo, writerKey.paneInfo) && Objects.equal(this.destination, writerKey.destination);
        }

        @Pure
        public int hashCode() {
            return Objects.hashCode(this.window, this.paneInfo, this.destination);
        }
    }

    public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, OutputT> to(FileBasedSink<UserT, DestinationT, OutputT> fileBasedSink) {
        Preconditions.checkArgument(fileBasedSink != null, "sink can not be null");
        return new AutoValue_WriteFiles.Builder().setSink(fileBasedSink).setComputeNumShards(null).setNumShardsProvider(null).setWindowedWrites(false).setMaxNumWritersPerBundle(20).setSideInputs(fileBasedSink.getDynamicDestinations().getSideInputs()).build();
    }

    public abstract FileBasedSink<UserT, DestinationT, OutputT> getSink();

    public abstract PTransform<PCollection<UserT>, PCollectionView<Integer>> getComputeNumShards();

    public abstract ValueProvider<Integer> getNumShardsProvider();

    public abstract boolean getWindowedWrites();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract int getMaxNumWritersPerBundle();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract List<PCollectionView<?>> getSideInputs();

    public abstract ShardingFunction<UserT, DestinationT> getShardingFunction();

    abstract Builder<UserT, DestinationT, OutputT> toBuilder();

    @Override // org.apache.beam.sdk.transforms.PTransform
    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
        return PCollectionViews.toAdditionalInputs(getSideInputs());
    }

    public WriteFiles<UserT, DestinationT, OutputT> withNumShards(int i) {
        return i > 0 ? withNumShards(ValueProvider.StaticValueProvider.of(Integer.valueOf(i))) : withRunnerDeterminedSharding();
    }

    public WriteFiles<UserT, DestinationT, OutputT> withNumShards(ValueProvider<Integer> valueProvider) {
        return toBuilder().setNumShardsProvider(valueProvider).build();
    }

    public WriteFiles<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle(int i) {
        return toBuilder().setMaxNumWritersPerBundle(i).build();
    }

    public WriteFiles<UserT, DestinationT, OutputT> withSideInputs(List<PCollectionView<?>> list) {
        return toBuilder().setSideInputs(list).build();
    }

    public WriteFiles<UserT, DestinationT, OutputT> withSharding(PTransform<PCollection<UserT>, PCollectionView<Integer>> pTransform) {
        Preconditions.checkArgument(pTransform != null, "sharding can not be null. Use withRunnerDeterminedSharding() instead.");
        return toBuilder().setComputeNumShards(pTransform).build();
    }

    public WriteFiles<UserT, DestinationT, OutputT> withRunnerDeterminedSharding() {
        return toBuilder().setComputeNumShards(null).setNumShardsProvider(null).build();
    }

    public WriteFiles<UserT, DestinationT, OutputT> withShardingFunction(ShardingFunction<UserT, DestinationT> shardingFunction) {
        return toBuilder().setShardingFunction(shardingFunction).build();
    }

    public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() {
        return toBuilder().setWindowedWrites(true).build();
    }

    public WriteFiles<UserT, DestinationT, OutputT> withNoSpilling() {
        return toBuilder().setMaxNumWritersPerBundle(-1).build();
    }

    @Override // org.apache.beam.sdk.transforms.PTransform
    public void validate(PipelineOptions pipelineOptions) {
        getSink().validate(pipelineOptions);
    }

    @Override // org.apache.beam.sdk.transforms.PTransform
    public WriteFilesResult<DestinationT> expand(PCollection<UserT> pCollection) {
        if (pCollection.isBounded() == PCollection.IsBounded.UNBOUNDED) {
            Preconditions.checkArgument(getWindowedWrites(), "Must use windowed writes when applying %s to an unbounded PCollection", WriteFiles.class.getSimpleName());
            Preconditions.checkArgument((getComputeNumShards() == null && getNumShardsProvider() == null) ? false : true, "When applying %s to an unbounded PCollection, must specify number of output shards explicitly", WriteFiles.class.getSimpleName());
        }
        this.writeOperation = getSink().createWriteOperation();
        this.writeOperation.setWindowedWrites(getWindowedWrites());
        if (!getWindowedWrites()) {
            pCollection = (PCollection) pCollection.apply("RewindowIntoGlobal", Window.into(new GlobalWindows()).triggering(DefaultTrigger.of()).discardingFiredPanes());
        }
        try {
            Coder<DestinationT> destinationCoderWithDefault = getDynamicDestinations().getDestinationCoderWithDefault(pCollection.getPipeline().getCoderRegistry());
            destinationCoderWithDefault.verifyDeterministic();
            FileBasedSink.FileResultCoder of = FileBasedSink.FileResultCoder.of(pCollection.getWindowingStrategy().getWindowFn().windowCoder(), destinationCoderWithDefault);
            PCollectionView pCollectionView = getComputeNumShards() == null ? null : (PCollectionView) pCollection.apply(getComputeNumShards());
            return (WriteFilesResult) ((PCollection) ((getComputeNumShards() == null && getNumShardsProvider() == null) ? (PCollection) pCollection.apply("WriteUnshardedBundlesToTempFiles", new WriteUnshardedBundlesToTempFiles(destinationCoderWithDefault, of)) : (PCollection) pCollection.apply("WriteShardedBundlesToTempFiles", new WriteShardedBundlesToTempFiles(destinationCoderWithDefault, of, pCollectionView))).apply("GatherTempFileResults", new GatherResults(of))).apply("FinalizeTempFileBundles", new FinalizeTempFileBundles(pCollectionView, destinationCoderWithDefault));
        } catch (CannotProvideCoderException | Coder.NonDeterministicException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.beam.sdk.transforms.PTransform, org.apache.beam.sdk.transforms.display.HasDisplayData
    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        builder.add(DisplayData.item("sink", getSink().getClass()).withLabel("WriteFiles Sink")).include("sink", getSink());
        if (getComputeNumShards() != null) {
            builder.include("sharding", getComputeNumShards());
        } else {
            builder.addIfNotNull(DisplayData.item("numShards", getNumShardsProvider()).withLabel("Fixed Number of Shards"));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public FileBasedSink.DynamicDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() {
        return this.writeOperation.getSink().getDynamicDestinations();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <DestinationT, OutputT> void writeOrClose(FileBasedSink.Writer<DestinationT, OutputT> writer, OutputT outputt) throws Exception {
        try {
            writer.write(outputt);
        } catch (Exception e) {
            try {
                writer.close();
                writer.cleanup();
            } catch (Exception e2) {
                if (e2 instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                e.addSuppressed(e2);
            }
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <DestinationT> int hashDestination(DestinationT destinationt, Coder<DestinationT> coder) throws IOException {
        return Hashing.murmur3_32().hashBytes(CoderUtils.encodeToByteArray(coder, destinationt)).asInt();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public List<KV<FileBasedSink.FileResult<DestinationT>, ResourceId>> finalizeAllDestinations(List<FileBasedSink.FileResult<DestinationT>> list, Integer num) throws Exception {
        ArrayListMultimap create = ArrayListMultimap.create();
        for (FileBasedSink.FileResult<DestinationT> fileResult : list) {
            create.put(KV.of(fileResult.getDestination(), fileResult.getWindow()), fileResult);
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry entry : create.asMap().entrySet()) {
            KV kv = (KV) entry.getKey();
            newArrayList.addAll(this.writeOperation.finalizeDestination(kv.getKey(), (BoundedWindow) kv.getValue(), num, (Collection) entry.getValue()));
        }
        return newArrayList;
    }
}
