package org.apache.beam.sdk.io;

import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.repackaged.com.google.common.base.Strings;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Ordering;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink.class */
public abstract class FileBasedSink<T> extends Sink<T> {
    private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
    protected final WritableByteChannelFactory writableByteChannelFactory;
    protected final ValueProvider<String> baseOutputFilename;
    protected final String extension;
    protected final String fileNamingTemplate;

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$CompressionType.class */
    public enum CompressionType implements WritableByteChannelFactory {
        UNCOMPRESSED("", MimeTypes.TEXT) { // from class: org.apache.beam.sdk.io.FileBasedSink.CompressionType.1
            @Override // org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory
            public WritableByteChannel create(WritableByteChannel writableByteChannel) throws IOException {
                return writableByteChannel;
            }
        },
        GZIP(".gz", MimeTypes.BINARY) { // from class: org.apache.beam.sdk.io.FileBasedSink.CompressionType.2
            @Override // org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory
            public WritableByteChannel create(WritableByteChannel writableByteChannel) throws IOException {
                return Channels.newChannel(new GZIPOutputStream(Channels.newOutputStream(writableByteChannel), true));
            }
        },
        BZIP2(".bz2", MimeTypes.BINARY) { // from class: org.apache.beam.sdk.io.FileBasedSink.CompressionType.3
            @Override // org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory
            public WritableByteChannel create(WritableByteChannel writableByteChannel) throws IOException {
                return Channels.newChannel((OutputStream) new BZip2CompressorOutputStream(Channels.newOutputStream(writableByteChannel)));
            }
        };

        private String filenameSuffix;
        private String mimeType;

        CompressionType(String str, String str2) {
            this.filenameSuffix = str;
            this.mimeType = str2;
        }

        @Override // org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory
        public String getFilenameSuffix() {
            return this.filenameSuffix;
        }

        @Override // org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory
        public String getMimeType() {
            return this.mimeType;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$FileBasedWriteOperation.class */
    public static abstract class FileBasedWriteOperation<T> extends Sink.WriteOperation<T, FileResult> {
        protected final FileBasedSink<T> sink;
        protected final ValueProvider<String> tempDirectory;

        /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$FileBasedWriteOperation$TemporaryDirectoryBuilder.class */
        private static class TemporaryDirectoryBuilder implements SerializableFunction<String, String> {
            Instant now;

            private TemporaryDirectoryBuilder() {
                this.now = Instant.now();
            }

            @Override // org.apache.beam.sdk.transforms.SerializableFunction
            public String apply(String str) {
                try {
                    Path path = IOChannelUtils.getFactory(str).toPath(str);
                    return path.resolveSibling("temp-beam-" + path.getFileName() + "-" + this.now.toString(DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss"))).toString();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        protected static String buildTemporaryFilename(String str, String str2) throws IOException {
            return IOChannelUtils.getFactory(str).resolve(str, str2);
        }

        public FileBasedWriteOperation(FileBasedSink<T> fileBasedSink) {
            this(fileBasedSink, ValueProvider.NestedValueProvider.of(fileBasedSink.getBaseOutputFilenameProvider(), new TemporaryDirectoryBuilder()));
        }

        public FileBasedWriteOperation(FileBasedSink<T> fileBasedSink, String str) {
            this(fileBasedSink, ValueProvider.StaticValueProvider.of(str));
        }

        private FileBasedWriteOperation(FileBasedSink<T> fileBasedSink, ValueProvider<String> valueProvider) {
            this.sink = fileBasedSink;
            this.tempDirectory = valueProvider;
        }

        @Override // org.apache.beam.sdk.io.Sink.WriteOperation
        public abstract FileBasedWriter<T> createWriter(PipelineOptions pipelineOptions) throws Exception;

        @Override // org.apache.beam.sdk.io.Sink.WriteOperation
        public void initialize(PipelineOptions pipelineOptions) throws Exception {
        }

        @Override // org.apache.beam.sdk.io.Sink.WriteOperation
        public void finalize(Iterable<FileResult> iterable, PipelineOptions pipelineOptions) throws Exception {
            ArrayList arrayList = new ArrayList();
            for (FileResult fileResult : iterable) {
                FileBasedSink.LOG.debug("Temporary bundle output file {} will be copied.", fileResult.getFilename());
                arrayList.add(fileResult.getFilename());
            }
            copyToOutputFiles(arrayList, pipelineOptions);
            removeTemporaryFiles(arrayList, pipelineOptions);
        }

        protected final List<String> copyToOutputFiles(List<String> list, PipelineOptions pipelineOptions) throws IOException {
            int size = list.size();
            List<String> sortedCopy = Ordering.natural().sortedCopy(list);
            List<String> generateDestinationFilenames = generateDestinationFilenames(size);
            if (size > 0) {
                FileBasedSink.LOG.debug("Copying {} files.", Integer.valueOf(size));
                IOChannelUtils.getFactory(generateDestinationFilenames.get(0)).copy(sortedCopy, generateDestinationFilenames);
            } else {
                FileBasedSink.LOG.info("No output files to write.");
            }
            return generateDestinationFilenames;
        }

        protected final List<String> generateDestinationFilenames(int i) {
            ArrayList arrayList = new ArrayList();
            String str = getSink().extension;
            String str2 = getSink().baseOutputFilename.get();
            String str3 = getSink().fileNamingTemplate;
            String fileExtension = FileBasedSink.getFileExtension(str);
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(IOChannelUtils.constructName(str2, str3, fileExtension, i2, i));
            }
            int size = new HashSet(arrayList).size();
            Preconditions.checkState(size == i, "Shard name template '%s' only generated %s distinct file names for %s files.", str3, Integer.valueOf(size), Integer.valueOf(i));
            return arrayList;
        }

        protected final void removeTemporaryFiles(List<String> list, PipelineOptions pipelineOptions) throws IOException {
            String str = this.tempDirectory.get();
            FileBasedSink.LOG.debug("Removing temporary bundle output files in {}.", str);
            IOChannelFactory factory = IOChannelUtils.getFactory(str);
            HashSet hashSet = new HashSet();
            try {
                hashSet.addAll(factory.match(factory.resolve(str, "*")));
            } catch (Exception e) {
                FileBasedSink.LOG.warn("Failed to match temporary files under: [{}].", str);
            }
            HashSet hashSet2 = new HashSet(hashSet);
            hashSet2.addAll(list);
            FileBasedSink.LOG.debug("Removing {} temporary files found under {} ({} matched glob, {} known files)", new Object[]{Integer.valueOf(hashSet2.size()), str, Integer.valueOf(hashSet.size()), Integer.valueOf(hashSet2.size() - hashSet.size())});
            try {
                factory.remove(hashSet2);
                factory.remove(ImmutableList.of(str));
            } catch (Exception e2) {
                FileBasedSink.LOG.warn("Failed to remove temporary directory: [{}].", str);
            }
        }

        @Override // org.apache.beam.sdk.io.Sink.WriteOperation
        public Coder<FileResult> getWriterResultCoder() {
            return SerializableCoder.of(FileResult.class);
        }

        @Override // org.apache.beam.sdk.io.Sink.WriteOperation
        public FileBasedSink<T> getSink() {
            return this.sink;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$FileBasedWriter.class */
    public static abstract class FileBasedWriter<T> extends Sink.Writer<T, FileResult> {
        private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriter.class);
        final FileBasedWriteOperation<T> writeOperation;
        private String id;
        private String filename;
        private WritableByteChannel channel;
        protected String mimeType = MimeTypes.TEXT;

        public FileBasedWriter(FileBasedWriteOperation<T> fileBasedWriteOperation) {
            Preconditions.checkNotNull(fileBasedWriteOperation);
            this.writeOperation = fileBasedWriteOperation;
        }

        protected abstract void prepareWrite(WritableByteChannel writableByteChannel) throws Exception;

        protected void writeHeader() throws Exception {
        }

        protected void writeFooter() throws Exception {
        }

        @Override // org.apache.beam.sdk.io.Sink.Writer
        public final void open(String str) throws Exception {
            this.id = str;
            this.filename = FileBasedWriteOperation.buildTemporaryFilename(getWriteOperation().tempDirectory.get(), str);
            LOG.debug("Opening {}.", this.filename);
            WritableByteChannelFactory writableByteChannelFactory = getWriteOperation().getSink().writableByteChannelFactory;
            this.mimeType = writableByteChannelFactory.getMimeType();
            this.channel = writableByteChannelFactory.create(IOChannelUtils.create(this.filename, this.mimeType));
            try {
                prepareWrite(this.channel);
                LOG.debug("Writing header to {}.", this.filename);
                writeHeader();
                LOG.debug("Starting write of bundle {} to {}.", this.id, this.filename);
            } catch (Exception e) {
                try {
                    LOG.error("Writing header to {} failed, closing channel.", this.filename);
                    this.channel.close();
                } catch (IOException e2) {
                    LOG.error("Closing channel for {} failed: {}", this.filename, e2.getMessage());
                }
                throw e;
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.beam.sdk.io.Sink.Writer
        public final FileResult close() throws Exception {
            WritableByteChannel writableByteChannel = this.channel;
            Throwable th = null;
            try {
                LOG.debug("Writing footer to {}.", this.filename);
                writeFooter();
                if (writableByteChannel != null) {
                    if (0 != 0) {
                        try {
                            writableByteChannel.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        writableByteChannel.close();
                    }
                }
                FileResult fileResult = new FileResult(this.filename);
                LOG.debug("Result for bundle {}: {}", this.id, this.filename);
                return fileResult;
            } catch (Throwable th3) {
                if (writableByteChannel != null) {
                    if (0 != 0) {
                        try {
                            writableByteChannel.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        writableByteChannel.close();
                    }
                }
                throw th3;
            }
        }

        @Override // org.apache.beam.sdk.io.Sink.Writer
        public FileBasedWriteOperation<T> getWriteOperation() {
            return this.writeOperation;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$FileResult.class */
    public static final class FileResult implements Serializable {
        private final String filename;

        public FileResult(String str) {
            this.filename = str;
        }

        public String getFilename() {
            return this.filename;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$WritableByteChannelFactory.class */
    public interface WritableByteChannelFactory extends Serializable {
        WritableByteChannel create(WritableByteChannel writableByteChannel) throws IOException;

        String getMimeType();

        @Nullable
        String getFilenameSuffix();
    }

    public FileBasedSink(String str, String str2) {
        this(str, str2, ShardNameTemplate.INDEX_OF_MAX);
    }

    public FileBasedSink(String str, String str2, WritableByteChannelFactory writableByteChannelFactory) {
        this(ValueProvider.StaticValueProvider.of(str), str2, ShardNameTemplate.INDEX_OF_MAX, writableByteChannelFactory);
    }

    public FileBasedSink(String str, String str2, String str3) {
        this(ValueProvider.StaticValueProvider.of(str), str2, str3, CompressionType.UNCOMPRESSED);
    }

    public FileBasedSink(ValueProvider<String> valueProvider, String str, String str2, WritableByteChannelFactory writableByteChannelFactory) {
        this.writableByteChannelFactory = writableByteChannelFactory;
        this.baseOutputFilename = valueProvider;
        if (Strings.isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) {
            this.extension = str;
        } else {
            this.extension = str + getFileExtension(writableByteChannelFactory.getFilenameSuffix());
        }
        this.fileNamingTemplate = str2;
    }

    public ValueProvider<String> getBaseOutputFilenameProvider() {
        return this.baseOutputFilename;
    }

    @Override // org.apache.beam.sdk.io.Sink
    public void validate(PipelineOptions pipelineOptions) {
    }

    @Override // org.apache.beam.sdk.io.Sink
    public abstract FileBasedWriteOperation<T> createWriteOperation(PipelineOptions pipelineOptions);

    @Override // org.apache.beam.sdk.io.Sink, org.apache.beam.sdk.transforms.display.HasDisplayData
    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        Object[] objArr = new Object[3];
        objArr[0] = this.baseOutputFilename.isAccessible() ? this.baseOutputFilename.get() : this.baseOutputFilename.toString();
        objArr[1] = this.fileNamingTemplate;
        objArr[2] = getFileExtension(this.extension);
        builder.add(DisplayData.item("fileNamePattern", String.format("%s%s%s", objArr)).withLabel("File Name Pattern"));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getFileExtension(String str) {
        return (str == null || str.isEmpty()) ? "" : str.startsWith(".") ? str : "." + str;
    }
}
