package gobblin.data.management.copy.writer;

import com.google.common.io.Closer;
import gobblin.configuration.State;
import gobblin.data.management.copy.CopyableFile;
import gobblin.data.management.copy.FileAwareInputStream;
import gobblin.data.management.copy.OwnerAndPermission;
import gobblin.util.FileListUtils;
import gobblin.util.ForkOperatorUtils;
import gobblin.util.HadoopUtils;
import gobblin.util.PathUtils;
import gobblin.util.WriterUtils;
import gobblin.util.io.StreamUtils;
import gobblin.writer.DataWriter;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.class */
public class FileAwareInputStreamDataWriter implements DataWriter<FileAwareInputStream> {
    private static final Logger log = LoggerFactory.getLogger(FileAwareInputStreamDataWriter.class);
    protected final State state;
    protected final FileSystem fs;
    protected final Path stagingDir;
    protected final Path outputDir;
    protected final AtomicLong bytesWritten = new AtomicLong();
    protected final AtomicLong filesWritten = new AtomicLong();
    protected final Closer closer = Closer.create();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: gobblin.data.management.copy.writer.FileAwareInputStreamDataWriter$1, reason: invalid class name */
    /* loaded from: input_file:gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hadoop$fs$permission$FsAction = new int[FsAction.values().length];

        static {
            try {
                $SwitchMap$org$apache$hadoop$fs$permission$FsAction[FsAction.READ.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hadoop$fs$permission$FsAction[FsAction.WRITE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$hadoop$fs$permission$FsAction[FsAction.READ_WRITE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public FileAwareInputStreamDataWriter(State state, int i, int i2) throws IOException {
        this.state = state;
        this.fs = FileSystem.get(URI.create(this.state.getProp(ForkOperatorUtils.getPropertyNameForBranch("writer.fs.uri", i, i2), "file:///")), new Configuration());
        this.stagingDir = WriterUtils.getWriterStagingDir(state, i, i2);
        this.outputDir = new Path(state.getProp(ForkOperatorUtils.getPropertyNameForBranch("writer.output.dir", i, i2)));
    }

    @Override // 
    public void write(FileAwareInputStream fileAwareInputStream) throws IOException {
        fileAwareInputStream.getInputStream();
        Path stagingFilePath = getStagingFilePath(fileAwareInputStream.getFile());
        this.fs.mkdirs(stagingFilePath.getParent(), fileAwareInputStream.getFile().getDestinationOwnerAndPermission().getFsPermission());
        FSDataOutputStream create = this.fs.create(stagingFilePath, true);
        try {
            this.bytesWritten.addAndGet(StreamUtils.copy(fileAwareInputStream.getInputStream(), create));
            log.info("bytes written: " + this.bytesWritten.get() + " for file " + fileAwareInputStream.getFile());
            create.close();
            fileAwareInputStream.getInputStream().close();
            this.filesWritten.incrementAndGet();
            setFilePermissions(fileAwareInputStream.getFile());
        } catch (Throwable th) {
            create.close();
            fileAwareInputStream.getInputStream().close();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setFilePermissions(CopyableFile copyableFile) {
        try {
            setAncestorPermissions(copyableFile);
            setRecursivePermission(getStagingFilePath(copyableFile), copyableFile.getDestinationOwnerAndPermission());
        } catch (IOException e) {
            log.error("Failed to set permissions for " + copyableFile.getOrigin(), e);
        }
    }

    protected Path getStagingFilePath(CopyableFile copyableFile) {
        return new Path(this.stagingDir, PathUtils.withoutLeadingSeparator(copyableFile.getDestination()));
    }

    protected Path getOutputFilePath(CopyableFile copyableFile) {
        return new Path(this.outputDir, PathUtils.withoutLeadingSeparator(copyableFile.getDestination()));
    }

    private void setAncestorPermissions(CopyableFile copyableFile) throws IOException {
        if (copyableFile.getAncestorsOwnerAndPermission() == null) {
            return;
        }
        Path parent = getStagingFilePath(copyableFile).getParent();
        for (OwnerAndPermission ownerAndPermission : copyableFile.getAncestorsOwnerAndPermission()) {
            if (parent == null) {
                log.info("Ancestor owner and permission may not be set correctly. Exhausted parent paths before ancestor permissions");
                log.info(String.format("File destination path %s, AncestorOwnerAndPermissions size %s.", copyableFile.getDestination(), Integer.valueOf(copyableFile.getAncestorsOwnerAndPermission().size())));
                return;
            } else {
                setPathPermission(parent, ownerAndPermission);
                parent = parent.getParent();
            }
        }
    }

    private void setPathPermission(Path path, OwnerAndPermission ownerAndPermission) throws IOException {
        this.fs.setPermission(path, ownerAndPermission.getFsPermission());
        if (StringUtils.isNotBlank(ownerAndPermission.getGroup()) && StringUtils.isNotBlank(ownerAndPermission.getOwner())) {
            this.fs.setOwner(path, ownerAndPermission.getOwner(), ownerAndPermission.getGroup());
        } else {
            log.info("Owner and group will not be set as no valid user and group available for " + path);
        }
    }

    private void setRecursivePermission(Path path, OwnerAndPermission ownerAndPermission) throws IOException {
        List<FileStatus> listPathsRecursively = FileListUtils.listPathsRecursively(this.fs, path, FileListUtils.NO_OP_PATH_FILTER);
        Collections.reverse(listPathsRecursively);
        for (FileStatus fileStatus : listPathsRecursively) {
            setPathPermission(fileStatus.getPath(), addExecutePermissionsIfRequired(fileStatus, ownerAndPermission));
        }
    }

    private OwnerAndPermission addExecutePermissionsIfRequired(FileStatus fileStatus, OwnerAndPermission ownerAndPermission) {
        if (!fileStatus.isDir()) {
            return ownerAndPermission;
        }
        FsAction userAction = ownerAndPermission.getFsPermission().getUserAction();
        switch (AnonymousClass1.$SwitchMap$org$apache$hadoop$fs$permission$FsAction[ownerAndPermission.getFsPermission().getUserAction().ordinal()]) {
            case 1:
                userAction = FsAction.READ_EXECUTE;
                break;
            case 2:
                userAction = FsAction.WRITE_EXECUTE;
                break;
            case 3:
                userAction = FsAction.ALL;
                break;
        }
        return new OwnerAndPermission(ownerAndPermission.getOwner(), ownerAndPermission.getGroup(), new FsPermission(userAction, ownerAndPermission.getFsPermission().getGroupAction(), ownerAndPermission.getFsPermission().getOtherAction()));
    }

    public long recordsWritten() {
        return this.filesWritten.get();
    }

    public long bytesWritten() throws IOException {
        return this.bytesWritten.get();
    }

    public void close() throws IOException {
        this.closer.close();
    }

    public void commit() throws IOException {
        log.info(String.format("Committing data from %s to %s", this.stagingDir, this.outputDir));
        HadoopUtils.renameRecursively(this.fs, this.stagingDir, this.outputDir);
        this.fs.delete(this.stagingDir, true);
    }

    public void cleanup() throws IOException {
    }
}
