package org.apache.beam.sdk.io;

import java.util.UUID;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/Write.class */
public class Write {
    private static final Logger LOG = LoggerFactory.getLogger(Write.class);

    /* loaded from: input_file:org/apache/beam/sdk/io/Write$Bound.class */
    public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
        private final Sink<T> sink;

        private Bound(Sink<T> sink) {
            this.sink = sink;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PDone apply(PCollection<T> pCollection) {
            PipelineOptions options = pCollection.getPipeline().getOptions();
            this.sink.validate(options);
            return createWrite(pCollection, this.sink.createWriteOperation(options));
        }

        @Override // org.apache.beam.sdk.transforms.PTransform, org.apache.beam.sdk.transforms.display.HasDisplayData
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("sink", this.sink.getClass()).withLabel("Write Sink")).include(this.sink);
        }

        public Sink<T> getSink() {
            return this.sink;
        }

        private <WriteT> PDone createWrite(PCollection<T> pCollection, Sink.WriteOperation<T, WriteT> writeOperation) {
            Pipeline pipeline = pCollection.getPipeline();
            SerializableCoder of = SerializableCoder.of(writeOperation.getClass());
            PCollection<T> coder = ((PCollection) ((PCollection) pipeline.apply(Create.of(writeOperation).withCoder(of))).apply("Initialize", ParDo.of(new DoFn<Sink.WriteOperation<T, WriteT>, Sink.WriteOperation<T, WriteT>>() { // from class: org.apache.beam.sdk.io.Write.Bound.1
                @Override // org.apache.beam.sdk.transforms.DoFn
                public void processElement(DoFn<Sink.WriteOperation<T, WriteT>, Sink.WriteOperation<T, WriteT>>.ProcessContext processContext) throws Exception {
                    Sink.WriteOperation<T, WriteT> element = processContext.element();
                    Write.LOG.info("Initializing write operation {}", element);
                    element.initialize(processContext.getPipelineOptions());
                    Write.LOG.debug("Done initializing write operation {}", element);
                    processContext.output(element);
                }
            }))).setCoder((Coder) of);
            final PCollectionView<?> pCollectionView = (PCollectionView) coder.apply(View.asSingleton());
            final PCollectionView<?> pCollectionView2 = (PCollectionView) ((PCollection) ((PCollection) pCollection.apply(Window.into(new GlobalWindows()))).apply("WriteBundles", ParDo.of(new DoFn<T, WriteT>() { // from class: org.apache.beam.sdk.io.Write.Bound.2
                private Sink.Writer<T, WriteT> writer = null;

                @Override // org.apache.beam.sdk.transforms.DoFn
                public void processElement(DoFn<T, WriteT>.ProcessContext processContext) throws Exception {
                    if (this.writer == null) {
                        Sink.WriteOperation writeOperation2 = (Sink.WriteOperation) processContext.sideInput(pCollectionView);
                        Write.LOG.info("Opening writer for write operation {}", writeOperation2);
                        this.writer = writeOperation2.createWriter(processContext.getPipelineOptions());
                        this.writer.open(UUID.randomUUID().toString());
                        Write.LOG.debug("Done opening writer {} for operation {}", this.writer, pCollectionView);
                    }
                    try {
                        this.writer.write(processContext.element());
                    } catch (Exception e) {
                        try {
                            this.writer.close();
                        } catch (Exception e2) {
                        }
                        throw e;
                    }
                }

                @Override // org.apache.beam.sdk.transforms.DoFn
                public void finishBundle(DoFn<T, WriteT>.Context context) throws Exception {
                    if (this.writer != null) {
                        context.output(this.writer.close());
                    }
                }

                @Override // org.apache.beam.sdk.transforms.DoFn, org.apache.beam.sdk.transforms.display.HasDisplayData
                public void populateDisplayData(DisplayData.Builder builder) {
                    Bound.this.populateDisplayData(builder);
                }
            }).withSideInputs(pCollectionView))).setCoder((Coder) writeOperation.getWriterResultCoder()).apply(View.asIterable());
            return PDone.in(pCollection.getPipeline());
        }
    }

    public static <T> Bound<T> to(Sink<T> sink) {
        return new Bound<>(sink);
    }
}
