package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.io.fs.AbstractFileSplitter;
import com.datatorrent.lib.io.fs.FileStitcher;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.constraints.NotNull;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/fs/Synchronizer.class */
public class Synchronizer extends BaseOperator {
    private static final Logger LOG = LoggerFactory.getLogger(Synchronizer.class);
    private Map<String, AbstractFileSplitter.FileMetadata> fileMetadataMap = Maps.newHashMap();
    private Map<String, Map<Long, BlockMetadata.FileBlockMetadata>> fileToReceivedBlocksMetadataMap = Maps.newHashMap();
    public final transient DefaultInputPort<AbstractFileSplitter.FileMetadata> filesMetadataInput = new DefaultInputPort<AbstractFileSplitter.FileMetadata>() { // from class: com.datatorrent.lib.io.fs.Synchronizer.1
        public void process(AbstractFileSplitter.FileMetadata fileMetadata) {
            String filePath = fileMetadata.getFilePath();
            Map receivedBlocksMetadata = Synchronizer.this.getReceivedBlocksMetadata(filePath);
            Synchronizer.this.fileMetadataMap.put(filePath, fileMetadata);
            Synchronizer.this.emitTriggerIfAllBlocksReceived(fileMetadata, receivedBlocksMetadata);
        }
    };
    public final transient DefaultInputPort<BlockMetadata.FileBlockMetadata> blocksMetadataInput = new DefaultInputPort<BlockMetadata.FileBlockMetadata>() { // from class: com.datatorrent.lib.io.fs.Synchronizer.2
        public void process(BlockMetadata.FileBlockMetadata fileBlockMetadata) {
            String filePath = fileBlockMetadata.getFilePath();
            Synchronizer.LOG.debug("received blockId {} for file {}", Long.valueOf(fileBlockMetadata.getBlockId()), filePath);
            Map receivedBlocksMetadata = Synchronizer.this.getReceivedBlocksMetadata(filePath);
            receivedBlocksMetadata.put(Long.valueOf(fileBlockMetadata.getBlockId()), fileBlockMetadata);
            AbstractFileSplitter.FileMetadata fileMetadata = (AbstractFileSplitter.FileMetadata) Synchronizer.this.fileMetadataMap.get(filePath);
            if (fileMetadata != null) {
                Synchronizer.this.emitTriggerIfAllBlocksReceived(fileMetadata, receivedBlocksMetadata);
            }
        }
    };
    public final transient DefaultOutputPort<OutputFileMetadata> trigger = new DefaultOutputPort<>();

    /* loaded from: input_file:com/datatorrent/lib/io/fs/Synchronizer$OutputFileMetadata.class */
    public static class OutputFileMetadata extends AbstractFileSplitter.FileMetadata implements StitchedFileMetaData {
        private List<StitchBlock> stitchBlocksList;

        protected OutputFileMetadata() {
            this.stitchBlocksList = Lists.newArrayList();
        }

        protected OutputFileMetadata(AbstractFileSplitter.FileMetadata fileMetadata, List<StitchBlock> list) {
            super(fileMetadata);
            this.stitchBlocksList = list;
        }

        public OutputFileMetadata(@NotNull String str) {
            super(str);
        }

        @Override // com.datatorrent.lib.io.fs.Synchronizer.StitchedFileMetaData
        public String getStitchedFileRelativePath() {
            return getRelativePath();
        }

        @Override // com.datatorrent.lib.io.fs.Synchronizer.StitchedFileMetaData
        public List<StitchBlock> getStitchBlocksList() {
            return this.stitchBlocksList;
        }

        public void setOutputBlockMetaDataList(List<StitchBlock> list) {
            this.stitchBlocksList = list;
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/Synchronizer$StitchBlock.class */
    public interface StitchBlock {
        void writeTo(FileSystem fileSystem, String str, OutputStream outputStream) throws IOException, FileStitcher.BlockNotFoundException;

        long getBlockId();
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/Synchronizer$StitchBlockMetaData.class */
    public static class StitchBlockMetaData extends BlockMetadata.FileBlockMetadata implements StitchBlock {
        public static final int BUFFER_SIZE = 65536;
        String sourceRelativePath;
        boolean isLastBlockSource;

        public StitchBlockMetaData() {
        }

        public StitchBlockMetaData(BlockMetadata.FileBlockMetadata fileBlockMetadata, String str, boolean z) {
            super(fileBlockMetadata.getFilePath(), fileBlockMetadata.getBlockId(), fileBlockMetadata.getOffset(), fileBlockMetadata.getLength(), fileBlockMetadata.isLastBlock(), fileBlockMetadata.getPreviousBlockId());
            this.sourceRelativePath = str;
            this.isLastBlockSource = z;
        }

        public String getSourceRelativePath() {
            return this.sourceRelativePath;
        }

        public void setSourceRelativePath(String str) {
            this.sourceRelativePath = str;
        }

        public boolean isLastBlockSource() {
            return this.isLastBlockSource;
        }

        public void setLastBlockSource(boolean z) {
            this.isLastBlockSource = z;
        }

        @Override // com.datatorrent.lib.io.fs.Synchronizer.StitchBlock
        public void writeTo(FileSystem fileSystem, String str, OutputStream outputStream) throws IOException, FileStitcher.BlockNotFoundException {
            Path path = new Path(str, Long.toString(getBlockId()));
            if (!fileSystem.exists(path)) {
                throw new FileStitcher.BlockNotFoundException(path);
            }
            writeTo(fileSystem, str, outputStream, 0L, fileSystem.getFileStatus(path).getLen());
        }

        public void writeTo(FileSystem fileSystem, String str, OutputStream outputStream, long j, long j2) throws IOException, FileStitcher.BlockNotFoundException {
            byte[] bArr = new byte[BUFFER_SIZE];
            Path path = new Path(str, Long.toString(getBlockId()));
            if (!fileSystem.exists(path)) {
                throw new FileStitcher.BlockNotFoundException(path);
            }
            FSDataInputStream open = fileSystem.open(path);
            try {
                open.skip(j);
                long j3 = j2;
                int min = Math.min(BUFFER_SIZE, (int) j3);
                while (true) {
                    int read = open.read(bArr, 0, min);
                    if (read == -1 || j3 <= 0) {
                        break;
                    }
                    outputStream.write(bArr, 0, read);
                    j3 -= read;
                    min = Math.min(BUFFER_SIZE, (int) j3);
                }
            } finally {
                open.close();
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/Synchronizer$StitchedFileMetaData.class */
    public interface StitchedFileMetaData {
        String getStitchedFileRelativePath();

        List<StitchBlock> getStitchBlocksList();
    }

    public void setup(Context.OperatorContext operatorContext) {
        super.setup(operatorContext);
    }

    public void beginWindow(long j) {
        super.beginWindow(j);
    }

    public void endWindow() {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void emitTriggerIfAllBlocksReceived(AbstractFileSplitter.FileMetadata fileMetadata, Map<Long, BlockMetadata.FileBlockMetadata> map) {
        String filePath = fileMetadata.getFilePath();
        if (map.size() != fileMetadata.getNumberOfBlocks()) {
            this.fileMetadataMap.put(filePath, fileMetadata);
            return;
        }
        Set<Long> keySet = map.keySet();
        boolean z = false;
        if (!fileMetadata.isDirectory()) {
            for (long j : fileMetadata.getBlockIds()) {
                if (!keySet.contains(Long.valueOf(j))) {
                    z = true;
                }
            }
        }
        if (z) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis() - fileMetadata.getDiscoverTime();
        this.trigger.emit(new OutputFileMetadata(fileMetadata, constructOutputBlockMetadataList(fileMetadata)));
        LOG.debug("Total time taken to process the file {} is {} ms", fileMetadata.getFilePath(), Long.valueOf(currentTimeMillis));
        this.fileMetadataMap.remove(filePath);
    }

    private List<StitchBlock> constructOutputBlockMetadataList(AbstractFileSplitter.FileMetadata fileMetadata) {
        Map<Long, BlockMetadata.FileBlockMetadata> map = this.fileToReceivedBlocksMetadataMap.get(fileMetadata.getFilePath());
        ArrayList newArrayList = Lists.newArrayList();
        if (fileMetadata.isDirectory()) {
            return newArrayList;
        }
        long[] blockIds = fileMetadata.getBlockIds();
        int i = 0;
        while (i < blockIds.length) {
            newArrayList.add(new StitchBlockMetaData(map.get(Long.valueOf(blockIds[i])), fileMetadata.getRelativePath(), i == blockIds.length - 1));
            i++;
        }
        return newArrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<Long, BlockMetadata.FileBlockMetadata> getReceivedBlocksMetadata(String str) {
        Map<Long, BlockMetadata.FileBlockMetadata> map = this.fileToReceivedBlocksMetadataMap.get(str);
        if (map == null) {
            map = new HashMap();
            this.fileToReceivedBlocksMetadataMap.put(str, map);
        }
        return map;
    }
}
