package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.lib.io.block.BlockWriter;
import com.datatorrent.lib.io.fs.Synchronizer;
import com.datatorrent.lib.io.fs.Synchronizer.StitchedFileMetaData;
import com.google.common.collect.Queues;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Queue;
import javax.validation.constraints.NotNull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/fs/FileStitcher.class */
public class FileStitcher<T extends Synchronizer.StitchedFileMetaData> extends AbstractReconciler<T, T> {
    protected transient FileSystem appFS;
    protected transient FileSystem outputFS;

    @NotNull
    protected String filePath;
    protected transient String blocksDirectoryPath;
    protected static final String PART_FILE_EXTENTION = "._COPYING_";
    protected transient Path tempOutFilePath;
    protected static final Logger LOG = LoggerFactory.getLogger(FileStitcher.class);
    private String blocksDirectory = BlockWriter.DEFAULT_BLOCKS_DIR;
    protected Queue<T> successfulFiles = Queues.newLinkedBlockingQueue();
    protected Queue<T> skippedFiles = Queues.newLinkedBlockingQueue();
    protected Queue<T> failedFiles = Queues.newLinkedBlockingQueue();

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<T> completedFilesMetaOutput = new DefaultOutputPort<>();
    private boolean writeChecksum = true;

    /* loaded from: input_file:com/datatorrent/lib/io/fs/FileStitcher$BlockNotFoundException.class */
    public static class BlockNotFoundException extends Exception {
        private static final long serialVersionUID = -7409415466834194798L;
        Path blockPath;

        public BlockNotFoundException(Path path) {
            this.blockPath = path;
        }

        public Path getBlockPath() {
            return this.blockPath;
        }
    }

    @Override // com.datatorrent.lib.io.fs.AbstractReconciler
    public void setup(Context.OperatorContext operatorContext) {
        this.blocksDirectoryPath = ((String) operatorContext.getValue(Context.DAGContext.APPLICATION_PATH)) + "/" + this.blocksDirectory;
        try {
            this.outputFS = getOutputFSInstance();
            this.outputFS.setWriteChecksum(this.writeChecksum);
            try {
                this.appFS = getAppFSInstance();
                super.setup(operatorContext);
            } catch (IOException e) {
                try {
                    this.outputFS.close();
                    throw new RuntimeException("Exception in getting application file system.", e);
                } catch (IOException e2) {
                    throw new RuntimeException("Exception in closing output file system.", e2);
                }
            }
        } catch (IOException e3) {
            throw new RuntimeException("Exception in getting output file system.", e3);
        }
    }

    @Override // com.datatorrent.lib.io.fs.AbstractReconciler
    public void endWindow() {
        int size = this.doneTuples.size();
        for (int i = 0; i < size; i++) {
            Synchronizer.StitchedFileMetaData stitchedFileMetaData = (Synchronizer.StitchedFileMetaData) this.doneTuples.peek();
            if (this.successfulFiles.contains(stitchedFileMetaData)) {
                this.successfulFiles.remove(stitchedFileMetaData);
                LOG.debug("File copy successful: {}", stitchedFileMetaData.getStitchedFileRelativePath());
            } else if (this.skippedFiles.contains(stitchedFileMetaData)) {
                this.skippedFiles.remove(stitchedFileMetaData);
                LOG.debug("File copy skipped: {}", stitchedFileMetaData.getStitchedFileRelativePath());
            } else {
                if (!this.failedFiles.contains(stitchedFileMetaData)) {
                    throw new RuntimeException("Tuple present in doneTuples but not in sucessful /skipped/ failed files: " + stitchedFileMetaData.getStitchedFileRelativePath());
                }
                this.failedFiles.remove(stitchedFileMetaData);
                LOG.debug("File copy failed: {}", stitchedFileMetaData.getStitchedFileRelativePath());
            }
            this.completedFilesMetaOutput.emit(stitchedFileMetaData);
            this.committedTuples.remove(stitchedFileMetaData);
            this.doneTuples.poll();
        }
    }

    protected FileSystem getAppFSInstance() throws IOException {
        return FileSystem.newInstance(new Path(this.blocksDirectoryPath).toUri(), new Configuration());
    }

    protected FileSystem getOutputFSInstance() throws IOException {
        return FileSystem.newInstance(new Path(this.filePath).toUri(), new Configuration());
    }

    @Override // com.datatorrent.lib.io.fs.AbstractReconciler
    public void teardown() {
        super.teardown();
        boolean z = false;
        try {
            if (this.appFS != null) {
                this.appFS.close();
                this.appFS = null;
            }
        } catch (IOException e) {
            z = true;
        }
        try {
            if (this.outputFS != null) {
                this.outputFS.close();
                this.outputFS = null;
            }
        } catch (IOException e2) {
            z = true;
        }
        if (z) {
            throw new RuntimeException("Exception while closing file systems.");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.lib.io.fs.AbstractReconciler
    public void processTuple(T t) {
        LOG.debug("stitchedFileMetaData: {}", t);
        enqueueForProcessing(t);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.lib.io.fs.AbstractReconciler
    public void processCommittedData(T t) {
        try {
            mergeOutputFile(t);
        } catch (IOException e) {
            throw new RuntimeException("Unable to merge file: " + t.getStitchedFileRelativePath(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void mergeOutputFile(T t) throws IOException {
        mergeBlocks(t);
        this.successfulFiles.add(t);
        LOG.debug("Completed processing file: {} ", t.getStitchedFileRelativePath());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void mergeBlocks(T t) throws IOException {
        final Path path = new Path(this.filePath, t.getStitchedFileRelativePath());
        PathFilter pathFilter = new PathFilter() { // from class: com.datatorrent.lib.io.fs.FileStitcher.1
            public boolean accept(Path path2) {
                return path2.getName().startsWith(path.getName()) && path2.getName().endsWith(FileStitcher.PART_FILE_EXTENTION);
            }
        };
        if (this.outputFS.exists(path.getParent())) {
            for (FileStatus fileStatus : this.outputFS.listStatus(path.getParent(), pathFilter)) {
                LOG.debug("deleting vagrant file {}", fileStatus.getPath().getName());
                this.outputFS.delete(fileStatus.getPath(), true);
            }
        }
        this.tempOutFilePath = new Path(this.filePath, t.getStitchedFileRelativePath() + '.' + System.currentTimeMillis() + PART_FILE_EXTENTION);
        try {
            writeTempOutputFile(t);
            moveToFinalFile(t);
        } catch (BlockNotFoundException e) {
            LOG.warn("Block file {} not found. Assuming recovery mode for file {}. ", e.getBlockPath(), t.getStitchedFileRelativePath());
            this.outputFS.delete(this.tempOutFilePath, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OutputStream writeTempOutputFile(T t) throws IOException, BlockNotFoundException {
        OutputStream outputStream = getOutputStream(this.tempOutFilePath);
        try {
            Iterator<Synchronizer.StitchBlock> it = t.getStitchBlocksList().iterator();
            while (it.hasNext()) {
                it.next().writeTo(this.appFS, this.blocksDirectoryPath, outputStream);
            }
            return outputStream;
        } finally {
            outputStream.close();
        }
    }

    protected OutputStream getOutputStream(Path path) throws IOException {
        return this.outputFS.create(path);
    }

    protected void moveToFinalFile(T t) throws IOException {
        moveToFinalFile(this.tempOutFilePath, new Path(this.filePath, t.getStitchedFileRelativePath()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void moveToFinalFile(Path path, Path path2) throws IOException {
        Path pathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(path);
        Path pathWithoutSchemeAndAuthority2 = Path.getPathWithoutSchemeAndAuthority(path2);
        if (!this.outputFS.exists(pathWithoutSchemeAndAuthority2.getParent())) {
            this.outputFS.mkdirs(pathWithoutSchemeAndAuthority2.getParent());
        }
        if (this.outputFS.exists(pathWithoutSchemeAndAuthority2)) {
            this.outputFS.delete(pathWithoutSchemeAndAuthority2, false);
        }
        if (!this.outputFS.rename(pathWithoutSchemeAndAuthority, pathWithoutSchemeAndAuthority2)) {
            throw new RuntimeException("Unable to move file from " + pathWithoutSchemeAndAuthority + " to " + pathWithoutSchemeAndAuthority2);
        }
        LOG.debug("File {} moved successfully to destination folder.", pathWithoutSchemeAndAuthority2);
    }

    public String getBlocksDirectory() {
        return this.blocksDirectory;
    }

    public void setBlocksDirectory(String str) {
        this.blocksDirectory = str;
    }

    public String getFilePath() {
        return this.filePath;
    }

    public void setFilePath(String str) {
        this.filePath = str;
    }

    public boolean isWriteChecksum() {
        return this.writeChecksum;
    }

    public void setWriteChecksum(boolean z) {
        this.writeChecksum = z;
    }
}
