package org.apache.gobblin.data.management.copy.writer;

import com.codahale.metrics.Meter;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.collect.Iterators;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.gobblin.broker.EmptyKey;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.NotConfiguredException;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.crypto.EncryptionConfigParser;
import org.apache.gobblin.crypto.EncryptionFactory;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopySource;
import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.FileAwareInputStream;
import org.apache.gobblin.data.management.copy.OwnerAndPermission;
import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper;
import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
import org.apache.gobblin.instrumented.writer.InstrumentedDataWriter;
import org.apache.gobblin.state.ConstructState;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.FinalState;
import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.WriterUtils;
import org.apache.gobblin.util.io.StreamCopier;
import org.apache.gobblin.util.io.StreamThrottler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.class */
public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileAwareInputStream> implements FinalState, SpeculativeAttemptAwareConstruct {
    private static final Logger log = LoggerFactory.getLogger(FileAwareInputStreamDataWriter.class);
    public static final String GOBBLIN_COPY_BYTES_COPIED_METER = "gobblin.copy.bytesCopiedMeter";
    public static final String GOBBLIN_COPY_CHECK_FILESIZE = "gobblin.copy.checkFileSize";
    public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
    public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = "gobblin.copy.task.overwrite.on.commit";
    public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = false;
    public static final String STAGING_DIR_SUFFIX = "/taskStaging";
    public static final String DATASET_STAGING_DIR_PATH = "dataset.staging.dir.path";
    protected final AtomicLong bytesWritten;
    protected final AtomicLong filesWritten;
    protected final WorkUnitState state;
    protected final FileSystem fs;
    protected final Path stagingDir;
    protected final Path outputDir;
    private final Map<String, Object> encryptionConfig;
    protected CopyableDatasetMetadata copyableDatasetMetadata;
    protected final RecoveryHelper recoveryHelper;
    protected final SharedResourcesBroker<GobblinScopeTypes> taskBroker;
    protected final int bufferSize;
    private final boolean checkFileSize;
    private final Options.Rename renameOptions;
    private final FileContext fileContext;
    protected final Meter copySpeedMeter;
    protected final Optional<String> writerAttemptIdOptional;
    protected Optional<CopyableFile> actualProcessedCopyableFile;

    public FileAwareInputStreamDataWriter(State state, int i, int i2, String str) throws IOException {
        super(state);
        this.bytesWritten = new AtomicLong();
        this.filesWritten = new AtomicLong();
        if (i > 1) {
            throw new IOException("Distcp can only operate with one branch.");
        }
        if (!(state instanceof WorkUnitState)) {
            throw new RuntimeException(String.format("Distcp requires a %s on construction.", WorkUnitState.class.getSimpleName()));
        }
        this.state = (WorkUnitState) state;
        this.taskBroker = this.state.getTaskBroker();
        this.writerAttemptIdOptional = Optional.fromNullable(str);
        String prop = this.state.getProp(ForkOperatorUtils.getPropertyNameForBranch("writer.fs.uri", i, i2), "file:///");
        Configuration fsConfiguration = WriterUtils.getFsConfiguration(state);
        URI create = URI.create(prop);
        this.fs = FileSystem.get(create, fsConfiguration);
        this.fileContext = FileContext.getFileContext(create, fsConfiguration);
        if (state.getPropAsBoolean("user.defined.staging.dir.flag", false)) {
            this.stagingDir = new Path(state.getProp("user.defined.static.staging.dir"));
        } else if (state.getPropAsBoolean("dataset.staging.dir.used", false)) {
            state.setProp("writer.staging.dir", state.getProp("dataset.staging.dir.path") + STAGING_DIR_SUFFIX + "/" + state.getProp("job.name") + "/" + state.getProp("job.id"));
            this.stagingDir = this.writerAttemptIdOptional.isPresent() ? WriterUtils.getWriterStagingDir(state, i, i2, (String) this.writerAttemptIdOptional.get()) : WriterUtils.getWriterStagingDir(state, i, i2);
        } else {
            this.stagingDir = this.writerAttemptIdOptional.isPresent() ? WriterUtils.getWriterStagingDir(state, i, i2, (String) this.writerAttemptIdOptional.get()) : WriterUtils.getWriterStagingDir(state, i, i2);
        }
        this.copyableDatasetMetadata = CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
        this.outputDir = getOutputDir(state);
        this.recoveryHelper = new RecoveryHelper(this.fs, state);
        this.actualProcessedCopyableFile = Optional.absent();
        this.copySpeedMeter = getMetricContext().meter(GOBBLIN_COPY_BYTES_COPIED_METER);
        this.bufferSize = state.getPropAsInt(CopyConfiguration.BUFFER_SIZE, 32768);
        this.encryptionConfig = EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.WRITER, this.state, i, i2);
        this.checkFileSize = state.getPropAsBoolean(GOBBLIN_COPY_CHECK_FILESIZE, false);
        if (state.getPropAsBoolean(GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT, false)) {
            this.renameOptions = Options.Rename.OVERWRITE;
        } else {
            this.renameOptions = Options.Rename.NONE;
        }
    }

    public FileAwareInputStreamDataWriter(State state, int i, int i2) throws IOException {
        this(state, i, i2, null);
    }

    public final void writeImpl(FileAwareInputStream fileAwareInputStream) throws IOException {
        CopyableFile file = fileAwareInputStream.getFile();
        if (this.encryptionConfig != null) {
            file.setDestination(PathUtils.addExtension(file.getDestination(), new String[]{"." + EncryptionConfigParser.getEncryptionType(this.encryptionConfig)}));
        }
        Path stagingFilePath = getStagingFilePath(file);
        if (this.actualProcessedCopyableFile.isPresent()) {
            throw new IOException(getClass().getCanonicalName() + " can only process one file.");
        }
        this.actualProcessedCopyableFile = Optional.of(file);
        this.fs.mkdirs(stagingFilePath.getParent());
        writeImpl(fileAwareInputStream.getInputStream(), stagingFilePath, file, fileAwareInputStream);
        this.filesWritten.incrementAndGet();
    }

    protected void writeImpl(InputStream inputStream, Path path, CopyableFile copyableFile, FileAwareInputStream fileAwareInputStream) throws IOException {
        final short propAsShort = this.state.getPropAsShort("writer.file.replication.factor", copyableFile.getReplication(this.fs));
        final long blockSize = copyableFile.getBlockSize(this.fs);
        long len = copyableFile.getFileStatus().getLen();
        long j = len;
        Long l = null;
        boolean z = false;
        if (fileAwareInputStream.getSplit().isPresent()) {
            l = Long.valueOf(((DistcpFileSplitter.Split) fileAwareInputStream.getSplit().get()).getHighPosition() - ((DistcpFileSplitter.Split) fileAwareInputStream.getSplit().get()).getLowPosition());
            if (((DistcpFileSplitter.Split) fileAwareInputStream.getSplit().get()).isLastSplit()) {
                j = len % blockSize;
                z = false;
            } else {
                j = l.longValue();
                z = true;
            }
        }
        Optional<FileStatus> findPersistedFile = this.recoveryHelper.findPersistedFile(this.state, copyableFile, new Predicate<FileStatus>() { // from class: org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriter.1
            public boolean apply(FileStatus fileStatus) {
                return fileStatus.getReplication() == propAsShort && fileStatus.getBlockSize() == blockSize;
            }
        });
        if (findPersistedFile.isPresent()) {
            log.info(String.format("Recovering persisted file %s to %s.", ((FileStatus) findPersistedFile.get()).getPath(), path));
            this.fs.rename(((FileStatus) findPersistedFile.get()).getPath(), path);
            return;
        }
        if (copyableFile.getFileStatus().isDirectory()) {
            this.fs.mkdirs(path);
            return;
        }
        OutputStream create = this.fs.create(path, true, this.fs.getConf().getInt("io.file.buffer.size", 4096), propAsShort, blockSize);
        if (this.encryptionConfig != null) {
            create = EncryptionFactory.buildStreamCryptoProvider(this.encryptionConfig).encodeOutputStream(create);
        }
        try {
            try {
                FileSystem fileSystem = FileSystem.get(new Configuration());
                StreamCopier withBufferSize = new StreamCopier(((StreamThrottler) this.taskBroker.getSharedResource(new StreamThrottler.Factory(), new EmptyKey())).throttleInputStream().inputStream(inputStream).sourceURI(copyableFile.getOrigin().getPath().makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()).toUri()).targetURI(this.fs.makeQualified(path).toUri()).build(), create, l).withBufferSize(this.bufferSize);
                log.info("File {}: Starting copy", copyableFile.getOrigin().getPath());
                if (isInstrumentationEnabled()) {
                    withBufferSize.withCopySpeedMeter(this.copySpeedMeter);
                }
                long copy = withBufferSize.copy();
                if ((this.checkFileSize || z) && copy != j) {
                    throw new IOException(String.format("Incomplete write: expected %d, wrote %d bytes.", Long.valueOf(j), Long.valueOf(copy)));
                }
                this.bytesWritten.addAndGet(copy);
                if (isInstrumentationEnabled()) {
                    log.info("File {}: copied {} bytes, average rate: {} B/s", new Object[]{copyableFile.getOrigin().getPath(), Long.valueOf(this.copySpeedMeter.getCount()), Double.valueOf(this.copySpeedMeter.getMeanRate())});
                } else {
                    log.info("File {} copied.", copyableFile.getOrigin().getPath());
                }
                create.close();
                inputStream.close();
            } catch (NotConfiguredException e) {
                log.warn("Broker error. Some features of stream copier may not be available.", e);
                create.close();
                inputStream.close();
            }
        } catch (Throwable th) {
            create.close();
            inputStream.close();
            throw th;
        }
    }

    protected void setFilePermissions(CopyableFile copyableFile) throws IOException {
        setRecursivePermission(getStagingFilePath(copyableFile), copyableFile.getDestinationOwnerAndPermission());
    }

    protected Path getStagingFilePath(CopyableFile copyableFile) {
        return DistcpFileSplitter.isSplitWorkUnit(this.state) ? new Path(this.stagingDir, ((DistcpFileSplitter.Split) DistcpFileSplitter.getSplit(this.state).get()).getPartName()) : new Path(this.stagingDir, copyableFile.getDestination().getName());
    }

    protected static Path getPartitionOutputRoot(Path path, CopyEntity.DatasetAndPartition datasetAndPartition) {
        return new Path(path, datasetAndPartition.identifier());
    }

    public static Path getOutputFilePath(CopyableFile copyableFile, Path path, CopyEntity.DatasetAndPartition datasetAndPartition) {
        return new Path(getPartitionOutputRoot(path, datasetAndPartition), PathUtils.withoutLeadingSeparator(PathUtils.getPathWithoutSchemeAndAuthority(copyableFile.getDestination())));
    }

    public static Path getSplitOutputFilePath(CopyableFile copyableFile, Path path, CopyEntity.DatasetAndPartition datasetAndPartition, State state) {
        return DistcpFileSplitter.isSplitWorkUnit(state) ? new Path(getOutputFilePath(copyableFile, path, datasetAndPartition).getParent(), ((DistcpFileSplitter.Split) DistcpFileSplitter.getSplit(state).get()).getPartName()) : getOutputFilePath(copyableFile, path, datasetAndPartition);
    }

    public static Path getOutputDir(State state) {
        return new Path(state.getProp(ForkOperatorUtils.getPropertyNameForBranch("writer.output.dir", 1, 0)));
    }

    private void safeSetPathPermission(Path path, OwnerAndPermission ownerAndPermission) {
        try {
            if (ownerAndPermission.getFsPermission() != null) {
                this.fs.setPermission(path, ownerAndPermission.getFsPermission());
            }
        } catch (IOException e) {
            log.warn("Failed to set permission for directory " + path, e);
        }
        String owner = Strings.isNullOrEmpty(ownerAndPermission.getOwner()) ? null : ownerAndPermission.getOwner();
        String group = Strings.isNullOrEmpty(ownerAndPermission.getGroup()) ? null : ownerAndPermission.getGroup();
        if (owner != null || group != null) {
            try {
                this.fs.setOwner(path, owner, group);
            } catch (IOException e2) {
                log.warn("Failed to set owner and/or group for path " + path + " to " + owner + ":" + group, e2);
            }
        }
    }

    private void setRecursivePermission(Path path, OwnerAndPermission ownerAndPermission) throws IOException {
        List<FileStatus> listPathsRecursively = FileListUtils.listPathsRecursively(this.fs, path, FileListUtils.NO_OP_PATH_FILTER);
        Collections.reverse(listPathsRecursively);
        for (FileStatus fileStatus : listPathsRecursively) {
            safeSetPathPermission(fileStatus.getPath(), addExecutePermissionsIfRequired(fileStatus, ownerAndPermission));
        }
    }

    private static OwnerAndPermission addExecutePermissionsIfRequired(FileStatus fileStatus, OwnerAndPermission ownerAndPermission) {
        if (ownerAndPermission.getFsPermission() != null && fileStatus.isDir()) {
            return new OwnerAndPermission(ownerAndPermission.getOwner(), ownerAndPermission.getGroup(), addExecutePermissionToOwner(ownerAndPermission.getFsPermission()));
        }
        return ownerAndPermission;
    }

    static FsPermission addExecutePermissionToOwner(FsPermission fsPermission) {
        return new FsPermission(fsPermission.getUserAction().or(FsAction.EXECUTE), fsPermission.getGroupAction(), fsPermission.getOtherAction());
    }

    public long recordsWritten() {
        return this.filesWritten.get();
    }

    public long bytesWritten() throws IOException {
        return this.bytesWritten.get();
    }

    public void commit() throws IOException {
        if (this.actualProcessedCopyableFile.isPresent()) {
            CopyableFile copyableFile = (CopyableFile) this.actualProcessedCopyableFile.get();
            Path stagingFilePath = getStagingFilePath(copyableFile);
            Path splitOutputFilePath = getSplitOutputFilePath(copyableFile, this.outputDir, copyableFile.getDatasetAndPartition(this.copyableDatasetMetadata), this.state);
            log.info(String.format("Committing data from %s to %s", stagingFilePath, splitOutputFilePath));
            try {
                try {
                    setFilePermissions(copyableFile);
                    ensureDirectoryExists(this.fs, splitOutputFilePath.getParent(), copyableFile.getAncestorsOwnerAndPermission() == null ? Iterators.emptyIterator() : copyableFile.getAncestorsOwnerAndPermission().iterator());
                    this.fileContext.rename(stagingFilePath, splitOutputFilePath, new Options.Rename[]{this.renameOptions});
                } finally {
                    try {
                        this.fs.delete(this.stagingDir, true);
                    } catch (IOException e) {
                        log.warn("Failed to delete staging path at " + this.stagingDir);
                    }
                }
            } catch (IOException e2) {
                log.error("Could not commit file %s.", splitOutputFilePath);
                this.recoveryHelper.persistFile(this.state, copyableFile, stagingFilePath);
                throw e2;
            }
        }
    }

    private void ensureDirectoryExists(FileSystem fileSystem, Path path, Iterator<OwnerAndPermission> it) throws IOException {
        if (fileSystem.exists(path)) {
            return;
        }
        if (!it.hasNext()) {
            fileSystem.mkdirs(path);
            return;
        }
        OwnerAndPermission next = it.next();
        if (path.getParent() != null) {
            ensureDirectoryExists(fileSystem, path.getParent(), it);
        }
        if (fileSystem.mkdirs(path)) {
            if (next.getFsPermission() != null) {
                log.debug("Applying permissions %s to path %s.", next.getFsPermission(), path);
                fileSystem.setPermission(path, addExecutePermissionToOwner(next.getFsPermission()));
            }
            String group = next.getGroup();
            String owner = next.getOwner();
            if (group == null && owner == null) {
                return;
            }
            log.debug("Applying owner %s and group %s to path %s.", new Object[]{owner, group, path});
            fileSystem.setOwner(path, owner, group);
        }
    }

    public void cleanup() throws IOException {
    }

    public State getFinalState() {
        State state = new State();
        if (this.actualProcessedCopyableFile.isPresent()) {
            CopySource.serializeCopyEntity(state, (CopyEntity) this.actualProcessedCopyableFile.get());
        }
        ConstructState constructState = new ConstructState();
        constructState.addOverwriteProperties(state);
        return constructState;
    }

    public boolean isSpeculativeAttemptSafe() {
        return this.writerAttemptIdOptional.isPresent() && getClass() == FileAwareInputStreamDataWriter.class;
    }
}
