package com.google.cloud.dataflow.sdk.io;

import com.google.api.client.googleapis.batch.BatchRequest;
import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.StorageRequest;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.io.Sink;
import com.google.cloud.dataflow.sdk.options.GcsOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.util.FileIOChannelFactory;
import com.google.cloud.dataflow.sdk.util.GcsIOChannelFactory;
import com.google.cloud.dataflow.sdk.util.IOChannelFactory;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.MimeTypes;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/io/FileBasedSink.class */
public abstract class FileBasedSink<T> extends Sink<T> {
    protected final String baseOutputFilename;
    protected final String extension;
    protected final String fileNamingTemplate;

    @NotThreadSafe
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/FileBasedSink$BatchHelper.class */
    private static class BatchHelper {
        private final BatchRequest batch;
        private final long maxRequestsPerBatch;
        private boolean flushing = false;
        private final List<QueueRequestCallback> pendingBatchEntries = new LinkedList();

        /* JADX INFO: Access modifiers changed from: protected */
        /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/FileBasedSink$BatchHelper$QueueRequestCallback.class */
        public interface QueueRequestCallback {
            void enqueue() throws IOException;
        }

        public BatchHelper(HttpRequestInitializer httpRequestInitializer, Storage storage, long j) {
            this.batch = storage.batch(httpRequestInitializer);
            this.maxRequestsPerBatch = j;
        }

        public <T> void queue(final StorageRequest<T> storageRequest, final JsonBatchCallback<T> jsonBatchCallback) throws IOException {
            this.pendingBatchEntries.add(new QueueRequestCallback() { // from class: com.google.cloud.dataflow.sdk.io.FileBasedSink.BatchHelper.1
                @Override // com.google.cloud.dataflow.sdk.io.FileBasedSink.BatchHelper.QueueRequestCallback
                public void enqueue() throws IOException {
                    storageRequest.queue(BatchHelper.this.batch, jsonBatchCallback);
                }
            });
            flushIfPossibleAndRequired();
        }

        private void flushIfPossibleAndRequired() throws IOException {
            if (this.pendingBatchEntries.size() > this.maxRequestsPerBatch) {
                flushIfPossible();
            }
        }

        private void flushIfPossible() throws IOException {
            if (this.flushing || this.pendingBatchEntries.size() <= 0) {
                return;
            }
            this.flushing = true;
            while (this.batch.size() < this.maxRequestsPerBatch && this.pendingBatchEntries.size() > 0) {
                try {
                    this.pendingBatchEntries.remove(0).enqueue();
                } finally {
                    this.flushing = false;
                }
            }
            this.batch.execute();
        }

        public void flush() throws IOException {
            flushIfPossible();
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/FileBasedSink$FileBasedWriteOperation.class */
    public static abstract class FileBasedWriteOperation<T> extends Sink.WriteOperation<T, FileResult> {
        private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriteOperation.class);
        protected final FileBasedSink<T> sink;
        protected final TemporaryFileRetention temporaryFileRetention;
        protected final String baseTemporaryFilename;
        protected static final String TEMPORARY_FILENAME_SEPARATOR = "-temp-";

        /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/FileBasedSink$FileBasedWriteOperation$TemporaryFileRetention.class */
        public enum TemporaryFileRetention {
            KEEP,
            REMOVE
        }

        protected static final String buildTemporaryFilename(String str, String str2) {
            String valueOf = String.valueOf(TEMPORARY_FILENAME_SEPARATOR);
            return new StringBuilder(0 + String.valueOf(str).length() + String.valueOf(valueOf).length() + String.valueOf(str2).length()).append(str).append(valueOf).append(str2).toString();
        }

        public FileBasedWriteOperation(FileBasedSink<T> fileBasedSink) {
            this(fileBasedSink, fileBasedSink.baseOutputFilename);
        }

        public FileBasedWriteOperation(FileBasedSink<T> fileBasedSink, String str) {
            this(fileBasedSink, str, TemporaryFileRetention.REMOVE);
        }

        public FileBasedWriteOperation(FileBasedSink<T> fileBasedSink, String str, TemporaryFileRetention temporaryFileRetention) {
            this.sink = fileBasedSink;
            this.baseTemporaryFilename = str;
            this.temporaryFileRetention = temporaryFileRetention;
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink.WriteOperation
        public abstract FileBasedWriter<T> createWriter(PipelineOptions pipelineOptions) throws Exception;

        @Override // com.google.cloud.dataflow.sdk.io.Sink.WriteOperation
        public void initialize(PipelineOptions pipelineOptions) throws Exception {
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink.WriteOperation
        public void finalize(Iterable<FileResult> iterable, PipelineOptions pipelineOptions) throws Exception {
            ArrayList arrayList = new ArrayList();
            for (FileResult fileResult : iterable) {
                LOG.debug("Temporary bundle output file {} will be copied.", fileResult.getFilename());
                arrayList.add(fileResult.getFilename());
            }
            copyToOutputFiles(arrayList, pipelineOptions);
            if (this.temporaryFileRetention == TemporaryFileRetention.REMOVE) {
                removeTemporaryFiles(pipelineOptions);
            }
        }

        protected final List<String> copyToOutputFiles(List<String> list, PipelineOptions pipelineOptions) throws IOException {
            int size = list.size();
            ArrayList arrayList = new ArrayList();
            List<String> generateDestinationFilenames = generateDestinationFilenames(size);
            arrayList.addAll(list);
            Collections.sort(arrayList);
            if (size > 0) {
                LOG.debug("Copying {} files.", Integer.valueOf(size));
                FileOperationsFactory.getFileOperations(generateDestinationFilenames.get(0), pipelineOptions).copy(arrayList, generateDestinationFilenames);
            } else {
                LOG.info("No output files to write.");
            }
            return generateDestinationFilenames;
        }

        protected final List<String> generateDestinationFilenames(int i) {
            String str;
            ArrayList arrayList = new ArrayList();
            String str2 = getSink().extension;
            String str3 = getSink().baseOutputFilename;
            String str4 = getSink().fileNamingTemplate;
            if (str2.length() == 0) {
                str = str2;
            } else {
                String valueOf = String.valueOf(str2);
                if (valueOf.length() != 0) {
                    str = ".".concat(valueOf);
                } else {
                    str = r1;
                    String str5 = new String(".");
                }
            }
            String str6 = str;
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(IOChannelUtils.constructName(str3, str4, str6, i2, i));
            }
            return arrayList;
        }

        protected final void removeTemporaryFiles(PipelineOptions pipelineOptions) throws IOException {
            String buildTemporaryFilename = buildTemporaryFilename(this.baseTemporaryFilename, "*");
            LOG.debug("Finding temporary bundle output files matching {}.", buildTemporaryFilename);
            FileOperations fileOperations = FileOperationsFactory.getFileOperations(buildTemporaryFilename, pipelineOptions);
            Collection<String> match = IOChannelUtils.getFactory(buildTemporaryFilename).match(buildTemporaryFilename);
            LOG.debug("{} temporary files matched {}", Integer.valueOf(match.size()), buildTemporaryFilename);
            LOG.debug("Removing {} files.", Integer.valueOf(match.size()));
            fileOperations.remove(match);
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink.WriteOperation
        public Coder<FileResult> getWriterResultCoder() {
            return SerializableCoder.of(FileResult.class);
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink.WriteOperation
        public FileBasedSink<T> getSink() {
            return this.sink;
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/FileBasedSink$FileBasedWriter.class */
    public static abstract class FileBasedWriter<T> extends Sink.Writer<T, FileResult> {
        private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriter.class);
        final FileBasedWriteOperation<T> writeOperation;
        private String id;
        private String filename;
        private WritableByteChannel channel;
        protected String mimeType = MimeTypes.TEXT;

        public FileBasedWriter(FileBasedWriteOperation<T> fileBasedWriteOperation) {
            Preconditions.checkNotNull(fileBasedWriteOperation);
            this.writeOperation = fileBasedWriteOperation;
        }

        protected abstract void prepareWrite(WritableByteChannel writableByteChannel) throws Exception;

        protected void writeHeader() throws Exception {
        }

        protected void writeFooter() throws Exception {
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink.Writer
        public final void open(String str) throws Exception {
            this.id = str;
            this.filename = FileBasedWriteOperation.buildTemporaryFilename(getWriteOperation().baseTemporaryFilename, str);
            LOG.debug("Opening {}.", this.filename);
            this.channel = IOChannelUtils.create(this.filename, this.mimeType);
            try {
                prepareWrite(this.channel);
                LOG.debug("Writing header to {}.", this.filename);
                writeHeader();
                LOG.debug("Starting write of bundle {} to {}.", this.id, this.filename);
            } catch (Exception e) {
                try {
                    LOG.error("Writing header to {} failed, closing channel.", this.filename);
                    this.channel.close();
                } catch (IOException e2) {
                    LOG.error("Closing channel for {} failed: {}", this.filename, e2.getMessage());
                }
                throw e;
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.google.cloud.dataflow.sdk.io.Sink.Writer
        public final FileResult close() throws Exception {
            WritableByteChannel writableByteChannel = this.channel;
            Throwable th = null;
            try {
                LOG.debug("Writing footer to {}.", this.filename);
                writeFooter();
                if (writableByteChannel != null) {
                    if (0 != 0) {
                        try {
                            writableByteChannel.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        writableByteChannel.close();
                    }
                }
                FileResult fileResult = new FileResult(this.filename);
                LOG.debug("Result for bundle {}: {}", this.id, this.filename);
                return fileResult;
            } catch (Throwable th3) {
                if (writableByteChannel != null) {
                    if (0 != 0) {
                        try {
                            writableByteChannel.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        writableByteChannel.close();
                    }
                }
                throw th3;
            }
        }

        @Override // com.google.cloud.dataflow.sdk.io.Sink.Writer
        public FileBasedWriteOperation<T> getWriteOperation() {
            return this.writeOperation;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/FileBasedSink$FileOperations.class */
    public interface FileOperations {
        void copy(List<String> list, List<String> list2) throws IOException;

        void remove(Collection<String> collection) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/FileBasedSink$FileOperationsFactory.class */
    public static class FileOperationsFactory {
        private FileOperationsFactory() {
        }

        public static FileOperations getFileOperations(String str, PipelineOptions pipelineOptions) throws IOException {
            IOChannelFactory factory = IOChannelUtils.getFactory(str);
            if (factory instanceof GcsIOChannelFactory) {
                return new GcsOperations(pipelineOptions);
            }
            if (factory instanceof FileIOChannelFactory) {
                return new LocalFileOperations();
            }
            throw new IOException("Unrecognized file system.");
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/FileBasedSink$FileResult.class */
    public static final class FileResult implements Serializable {
        private final String filename;

        public FileResult(String str) {
            this.filename = str;
        }

        public String getFilename() {
            return this.filename;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/FileBasedSink$GcsOperations.class */
    public static class GcsOperations implements FileOperations {
        private static final Logger LOG = LoggerFactory.getLogger(GcsOperations.class);
        private static final int MAX_REQUESTS_PER_BATCH = 1000;
        private ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
        private GcsOptions gcsOptions;
        private Storage gcs;
        private BatchHelper batchHelper;

        public GcsOperations(PipelineOptions pipelineOptions) {
            this.gcsOptions = (GcsOptions) pipelineOptions.as(GcsOptions.class);
            this.gcs = Transport.newStorageClient(this.gcsOptions).build();
            this.batchHelper = new BatchHelper(this.gcs.getRequestFactory().getInitializer(), this.gcs, 1000L);
        }

        @Override // com.google.cloud.dataflow.sdk.io.FileBasedSink.FileOperations
        public void copy(List<String> list, List<String> list2) throws IOException {
            Preconditions.checkArgument(list.size() == list2.size(), String.format("Number of source files {} must equal number of destination files {}", Integer.valueOf(list.size()), Integer.valueOf(list2.size())));
            for (int i = 0; i < list.size(); i++) {
                final GcsPath fromUri = GcsPath.fromUri(list.get(i));
                final GcsPath fromUri2 = GcsPath.fromUri(list2.get(i));
                LOG.debug("Copying {} to {}", fromUri, fromUri2);
                this.batchHelper.queue(this.gcs.objects().copy(fromUri.getBucket(), fromUri.getObject(), fromUri2.getBucket(), fromUri2.getObject(), (StorageObject) null), new JsonBatchCallback<StorageObject>() { // from class: com.google.cloud.dataflow.sdk.io.FileBasedSink.GcsOperations.1
                    public void onSuccess(StorageObject storageObject, HttpHeaders httpHeaders) {
                        GcsOperations.LOG.debug("Successfully copied {} to {}", fromUri, fromUri2);
                    }

                    public void onFailure(GoogleJsonError googleJsonError, HttpHeaders httpHeaders) throws IOException {
                        if (!GcsOperations.this.errorExtractor.itemNotFound(googleJsonError)) {
                            throw new IOException(googleJsonError.toString());
                        }
                        GcsOperations.LOG.debug("{} does not exist.", fromUri);
                    }
                });
            }
            this.batchHelper.flush();
        }

        @Override // com.google.cloud.dataflow.sdk.io.FileBasedSink.FileOperations
        public void remove(Collection<String> collection) throws IOException {
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                final GcsPath fromUri = GcsPath.fromUri(it.next());
                Logger logger = LOG;
                String valueOf = String.valueOf(fromUri);
                logger.debug(new StringBuilder(10 + String.valueOf(valueOf).length()).append("Removing: ").append(valueOf).toString());
                this.batchHelper.queue(this.gcs.objects().delete(fromUri.getBucket(), fromUri.getObject()), new JsonBatchCallback<Void>() { // from class: com.google.cloud.dataflow.sdk.io.FileBasedSink.GcsOperations.2
                    public void onSuccess(Void r5, HttpHeaders httpHeaders) throws IOException {
                        GcsOperations.LOG.debug("Successfully removed {}", fromUri);
                    }

                    public void onFailure(GoogleJsonError googleJsonError, HttpHeaders httpHeaders) throws IOException {
                        if (!GcsOperations.this.errorExtractor.itemNotFound(googleJsonError)) {
                            throw new IOException(googleJsonError.toString());
                        }
                        GcsOperations.LOG.debug("{} does not exist.", fromUri);
                    }
                });
            }
            this.batchHelper.flush();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/io/FileBasedSink$LocalFileOperations.class */
    public static class LocalFileOperations implements FileOperations {
        private static final Logger LOG = LoggerFactory.getLogger(LocalFileOperations.class);

        private LocalFileOperations() {
        }

        @Override // com.google.cloud.dataflow.sdk.io.FileBasedSink.FileOperations
        public void copy(List<String> list, List<String> list2) throws IOException {
            Preconditions.checkArgument(list.size() == list2.size(), String.format("Number of source files {} must equal number of destination files {}", Integer.valueOf(list.size()), Integer.valueOf(list2.size())));
            int size = list.size();
            for (int i = 0; i < size; i++) {
                String str = list.get(i);
                String str2 = list2.get(i);
                LOG.debug("Copying {} to {}", str, str2);
                copyOne(str, str2);
            }
        }

        private void copyOne(String str, String str2) throws IOException {
            try {
                Files.copy(Paths.get(str, new String[0]), Paths.get(str2, new String[0]), StandardCopyOption.REPLACE_EXISTING);
            } catch (NoSuchFileException e) {
                LOG.debug("{} does not exist.", str);
            }
        }

        @Override // com.google.cloud.dataflow.sdk.io.FileBasedSink.FileOperations
        public void remove(Collection<String> collection) throws IOException {
            for (String str : collection) {
                LOG.debug("Removing file {}", str);
                removeOne(str);
            }
        }

        private void removeOne(String str) throws IOException {
            if (Files.deleteIfExists(Paths.get(str, new String[0]))) {
                return;
            }
            LOG.debug("{} does not exist.", str);
        }
    }

    public FileBasedSink(String str, String str2) {
        this(str, str2, ShardNameTemplate.INDEX_OF_MAX);
    }

    public FileBasedSink(String str, String str2, String str3) {
        this.baseOutputFilename = str;
        this.extension = str2;
        this.fileNamingTemplate = str3;
    }

    @Override // com.google.cloud.dataflow.sdk.io.Sink
    public void validate(PipelineOptions pipelineOptions) {
    }

    @Override // com.google.cloud.dataflow.sdk.io.Sink
    public abstract FileBasedWriteOperation<T> createWriteOperation(PipelineOptions pipelineOptions);
}
