package gobblin.data.management.copy.writer;

import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.collect.Iterators;
import com.google.common.io.Closer;
import gobblin.configuration.State;
import gobblin.data.management.copy.CopySource;
import gobblin.data.management.copy.CopyableDatasetMetadata;
import gobblin.data.management.copy.CopyableFile;
import gobblin.data.management.copy.FileAwareInputStream;
import gobblin.data.management.copy.OwnerAndPermission;
import gobblin.data.management.copy.PreserveAttributes;
import gobblin.data.management.copy.recovery.RecoveryHelper;
import gobblin.state.ConstructState;
import gobblin.util.FileListUtils;
import gobblin.util.FinalState;
import gobblin.util.ForkOperatorUtils;
import gobblin.util.PathUtils;
import gobblin.util.WriterUtils;
import gobblin.util.io.StreamUtils;
import gobblin.writer.DataWriter;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.class */
public class FileAwareInputStreamDataWriter implements DataWriter<FileAwareInputStream>, FinalState {
    private static final Logger log = LoggerFactory.getLogger(FileAwareInputStreamDataWriter.class);
    protected final State state;
    protected final FileSystem fs;
    protected final Path stagingDir;
    protected final Path outputDir;
    protected CopyableDatasetMetadata copyableDatasetMetadata;
    protected final RecoveryHelper recoveryHelper;
    protected Optional<CopyableFile> actualProcessedCopyableFile;
    protected final AtomicLong bytesWritten = new AtomicLong();
    protected final AtomicLong filesWritten = new AtomicLong();
    protected final Closer closer = Closer.create();

    public FileAwareInputStreamDataWriter(State state, int i, int i2) throws IOException {
        if (i > 1) {
            throw new IOException("Distcp can only operate with one branch.");
        }
        this.state = state;
        this.fs = FileSystem.get(URI.create(this.state.getProp(ForkOperatorUtils.getPropertyNameForBranch("writer.fs.uri", i, i2), "file:///")), new Configuration());
        this.stagingDir = WriterUtils.getWriterStagingDir(state, i, i2);
        this.outputDir = getOutputDir(state);
        this.copyableDatasetMetadata = CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
        this.recoveryHelper = new RecoveryHelper(this.fs, state);
        this.actualProcessedCopyableFile = Optional.absent();
    }

    public final void write(FileAwareInputStream fileAwareInputStream) throws IOException {
        CopyableFile file = fileAwareInputStream.getFile();
        Path stagingFilePath = getStagingFilePath(file);
        this.actualProcessedCopyableFile = Optional.of(file);
        this.fs.mkdirs(stagingFilePath.getParent());
        writeImpl(fileAwareInputStream.getInputStream(), stagingFilePath, file);
        this.filesWritten.incrementAndGet();
    }

    protected void writeImpl(FSDataInputStream fSDataInputStream, Path path, CopyableFile copyableFile) throws IOException {
        final short replication = copyableFile.getPreserve().preserve(PreserveAttributes.Option.REPLICATION) ? copyableFile.getOrigin().getReplication() : this.fs.getDefaultReplication(path);
        final long blockSize = copyableFile.getPreserve().preserve(PreserveAttributes.Option.BLOCK_SIZE) ? copyableFile.getOrigin().getBlockSize() : this.fs.getDefaultBlockSize(path);
        Optional<FileStatus> findPersistedFile = this.recoveryHelper.findPersistedFile(this.state, copyableFile, new Predicate<FileStatus>() { // from class: gobblin.data.management.copy.writer.FileAwareInputStreamDataWriter.1
            public boolean apply(FileStatus fileStatus) {
                return fileStatus.getReplication() == replication && fileStatus.getBlockSize() == blockSize;
            }
        });
        if (findPersistedFile.isPresent()) {
            log.info(String.format("Recovering persisted file %s to %s.", ((FileStatus) findPersistedFile.get()).getPath(), path));
            this.fs.rename(((FileStatus) findPersistedFile.get()).getPath(), path);
            return;
        }
        FSDataOutputStream create = this.fs.create(path, true, this.fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
        try {
            this.bytesWritten.addAndGet(StreamUtils.copy(fSDataInputStream, create));
            log.info("bytes written: " + this.bytesWritten.get() + " for file " + copyableFile);
            create.close();
            fSDataInputStream.close();
        } catch (Throwable th) {
            create.close();
            fSDataInputStream.close();
            throw th;
        }
    }

    protected void setFilePermissions(CopyableFile copyableFile) throws IOException {
        setRecursivePermission(getStagingFilePath(copyableFile), copyableFile.getDestinationOwnerAndPermission());
    }

    protected Path getStagingFilePath(CopyableFile copyableFile) {
        return new Path(this.stagingDir, copyableFile.getDestination().getName());
    }

    protected static Path getPartitionOutputRoot(Path path, CopyableFile.DatasetAndPartition datasetAndPartition) {
        return new Path(path, datasetAndPartition.identifier());
    }

    public static Path getOutputFilePath(CopyableFile copyableFile, Path path, CopyableFile.DatasetAndPartition datasetAndPartition) {
        return new Path(getPartitionOutputRoot(path, datasetAndPartition), PathUtils.withoutLeadingSeparator(copyableFile.getDestination()));
    }

    public static Path getOutputDir(State state) {
        return new Path(state.getProp(ForkOperatorUtils.getPropertyNameForBranch("writer.output.dir", 1, 0)));
    }

    private void safeSetPathPermission(Path path, OwnerAndPermission ownerAndPermission) {
        try {
            if (ownerAndPermission.getFsPermission() != null) {
                this.fs.setPermission(path, ownerAndPermission.getFsPermission());
            }
        } catch (IOException e) {
            log.warn("Failed to set permission for directory " + path, e);
        }
        String owner = Strings.isNullOrEmpty(ownerAndPermission.getOwner()) ? null : ownerAndPermission.getOwner();
        String group = Strings.isNullOrEmpty(ownerAndPermission.getGroup()) ? null : ownerAndPermission.getGroup();
        if (owner != null || group != null) {
            try {
                this.fs.setOwner(path, owner, group);
            } catch (IOException e2) {
                log.warn("Failed to set owner and/or group for path " + path, e2);
            }
        }
    }

    private void setRecursivePermission(Path path, OwnerAndPermission ownerAndPermission) throws IOException {
        List<FileStatus> listPathsRecursively = FileListUtils.listPathsRecursively(this.fs, path, FileListUtils.NO_OP_PATH_FILTER);
        Collections.reverse(listPathsRecursively);
        for (FileStatus fileStatus : listPathsRecursively) {
            safeSetPathPermission(fileStatus.getPath(), addExecutePermissionsIfRequired(fileStatus, ownerAndPermission));
        }
    }

    private OwnerAndPermission addExecutePermissionsIfRequired(FileStatus fileStatus, OwnerAndPermission ownerAndPermission) {
        if (ownerAndPermission.getFsPermission() != null && fileStatus.isDir()) {
            return new OwnerAndPermission(ownerAndPermission.getOwner(), ownerAndPermission.getGroup(), addExecutePermissionToOwner(ownerAndPermission.getFsPermission()));
        }
        return ownerAndPermission;
    }

    static FsPermission addExecutePermissionToOwner(FsPermission fsPermission) {
        return new FsPermission(fsPermission.getUserAction().or(FsAction.EXECUTE), fsPermission.getGroupAction(), fsPermission.getOtherAction());
    }

    public long recordsWritten() {
        return this.filesWritten.get();
    }

    public long bytesWritten() throws IOException {
        return this.bytesWritten.get();
    }

    public void close() throws IOException {
        this.closer.close();
    }

    public void commit() throws IOException {
        CopyableFile copyableFile = (CopyableFile) this.actualProcessedCopyableFile.get();
        Path stagingFilePath = getStagingFilePath(copyableFile);
        Path outputFilePath = getOutputFilePath(copyableFile, this.outputDir, copyableFile.getDatasetAndPartition(this.copyableDatasetMetadata));
        log.info(String.format("Committing data from %s to %s", stagingFilePath, outputFilePath));
        try {
            try {
                setFilePermissions(copyableFile);
                ensureDirectoryExists(this.fs, outputFilePath.getParent(), copyableFile.getAncestorsOwnerAndPermission() == null ? Iterators.emptyIterator() : copyableFile.getAncestorsOwnerAndPermission().iterator());
                if (!this.fs.rename(stagingFilePath, outputFilePath)) {
                    throw new IOException(String.format("Could not commit file %s.", outputFilePath));
                }
            } catch (IOException e) {
                this.recoveryHelper.persistFile(this.state, copyableFile, stagingFilePath);
                throw e;
            }
        } finally {
            try {
                this.fs.delete(this.stagingDir, true);
            } catch (IOException e2) {
                log.warn("Failed to delete staging path at " + this.stagingDir);
            }
        }
    }

    private void ensureDirectoryExists(FileSystem fileSystem, Path path, Iterator<OwnerAndPermission> it) throws IOException {
        if (fileSystem.exists(path)) {
            return;
        }
        if (!it.hasNext()) {
            fileSystem.mkdirs(path);
            return;
        }
        OwnerAndPermission next = it.next();
        if (path.getParent() != null) {
            ensureDirectoryExists(fileSystem, path.getParent(), it);
        }
        if (next.getFsPermission() == null) {
            if (!fileSystem.mkdirs(path)) {
                return;
            }
        } else if (!fileSystem.mkdirs(path, addExecutePermissionToOwner(next.getFsPermission()))) {
            return;
        }
        String group = next.getGroup();
        String owner = next.getOwner();
        if (group == null && owner == null) {
            return;
        }
        fileSystem.setOwner(path, owner, group);
    }

    public void cleanup() throws IOException {
    }

    public State getFinalState() {
        try {
            State state = new State();
            CopySource.serializeCopyableFile(state, (CopyableFile) this.actualProcessedCopyableFile.get());
            state.setProp("did.i.actually.write.props", "yes");
            ConstructState constructState = new ConstructState();
            constructState.addOverwriteProperties(state);
            return constructState;
        } catch (IOException e) {
            throw new RuntimeException("Could not serialize actual processed copyable file.", e);
        }
    }
}
