package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.SocketCache;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Priority;

@InterfaceAudience.Private
/* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-2.0.3-alpha.jar:org/apache/hadoop/hdfs/DFSInputStream.class */
public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
    private final SocketCache socketCache;
    private final DFSClient dfsClient;
    private final String src;
    private final long prefetchSize;
    private final boolean verifyChecksum;
    private final int timeWindow;
    private int buffersize;
    private final int nCachedConnRetry;
    static final /* synthetic */ boolean $assertionsDisabled;
    private boolean closed = false;
    private BlockReader blockReader = null;
    private LocatedBlocks locatedBlocks = null;
    private long lastBlockBeingWrittenLength = 0;
    private DatanodeInfo currentNode = null;
    private LocatedBlock currentLocatedBlock = null;
    private long pos = 0;
    private long blockEnd = -1;
    private int failures = 0;
    private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes = new ConcurrentHashMap<>();
    private final byte[] oneByteBuf = new byte[1];

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-2.0.3-alpha.jar:org/apache/hadoop/hdfs/DFSInputStream$ByteArrayStrategy.class */
    public static class ByteArrayStrategy implements ReaderStrategy {
        final byte[] buf;

        public ByteArrayStrategy(byte[] bArr) {
            this.buf = bArr;
        }

        @Override // org.apache.hadoop.hdfs.DFSInputStream.ReaderStrategy
        public int doRead(BlockReader blockReader, int i, int i2) throws ChecksumException, IOException {
            return blockReader.read(this.buf, i, i2);
        }
    }

    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-2.0.3-alpha.jar:org/apache/hadoop/hdfs/DFSInputStream$ByteBufferStrategy.class */
    private static class ByteBufferStrategy implements ReaderStrategy {
        final ByteBuffer buf;

        ByteBufferStrategy(ByteBuffer byteBuffer) {
            this.buf = byteBuffer;
        }

        @Override // org.apache.hadoop.hdfs.DFSInputStream.ReaderStrategy
        public int doRead(BlockReader blockReader, int i, int i2) throws ChecksumException, IOException {
            int position = this.buf.position();
            int limit = this.buf.limit();
            boolean z = false;
            try {
                int read = blockReader.read(this.buf);
                z = true;
                if (1 == 0) {
                    this.buf.position(position);
                    this.buf.limit(limit);
                }
                return read;
            } catch (Throwable th) {
                if (!z) {
                    this.buf.position(position);
                    this.buf.limit(limit);
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-2.0.3-alpha.jar:org/apache/hadoop/hdfs/DFSInputStream$DNAddrPair.class */
    public static class DNAddrPair {
        DatanodeInfo info;
        InetSocketAddress addr;

        DNAddrPair(DatanodeInfo datanodeInfo, InetSocketAddress inetSocketAddress) {
            this.info = datanodeInfo;
            this.addr = inetSocketAddress;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-2.0.3-alpha.jar:org/apache/hadoop/hdfs/DFSInputStream$ReaderStrategy.class */
    public interface ReaderStrategy {
        int doRead(BlockReader blockReader, int i, int i2) throws ChecksumException, IOException;
    }

    void addToDeadNodes(DatanodeInfo datanodeInfo) {
        this.deadNodes.put(datanodeInfo, datanodeInfo);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DFSInputStream(DFSClient dFSClient, String str, int i, boolean z) throws IOException, UnresolvedLinkException {
        this.buffersize = 1;
        this.dfsClient = dFSClient;
        this.verifyChecksum = z;
        this.buffersize = i;
        this.src = str;
        this.socketCache = dFSClient.socketCache;
        this.prefetchSize = dFSClient.getConf().prefetchSize;
        this.timeWindow = dFSClient.getConf().timeWindow;
        this.nCachedConnRetry = dFSClient.getConf().nCachedConnRetry;
        openInfo();
    }

    synchronized void openInfo() throws IOException, UnresolvedLinkException {
        this.lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
        int i = 3;
        while (i > 0 && this.lastBlockBeingWrittenLength == -1) {
            DFSClient.LOG.warn("Last block locations not available. Datanodes might not have reported blocks completely. Will retry for " + i + " times");
            waitFor(CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT);
            this.lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
            i--;
        }
        if (i == 0) {
            throw new IOException("Could not obtain the last block locations.");
        }
    }

    private void waitFor(int i) throws IOException {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while getting the last block length.");
        }
    }

    private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
        LocatedBlock lastLocatedBlock;
        LocatedBlocks locatedBlocks = this.dfsClient.getLocatedBlocks(this.src, 0L, this.prefetchSize);
        if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug("newInfo = " + locatedBlocks);
        }
        if (locatedBlocks == null) {
            throw new IOException("Cannot open filename " + this.src);
        }
        if (this.locatedBlocks != null) {
            Iterator<LocatedBlock> it = this.locatedBlocks.getLocatedBlocks().iterator();
            Iterator<LocatedBlock> it2 = locatedBlocks.getLocatedBlocks().iterator();
            while (it.hasNext() && it2.hasNext()) {
                if (!it.next().getBlock().equals(it2.next().getBlock())) {
                    throw new IOException("Blocklist for " + this.src + " has changed!");
                }
            }
        }
        this.locatedBlocks = locatedBlocks;
        long j = 0;
        if (!this.locatedBlocks.isLastBlockComplete() && (lastLocatedBlock = this.locatedBlocks.getLastLocatedBlock()) != null) {
            if (lastLocatedBlock.getLocations().length == 0) {
                return -1L;
            }
            long readBlockLength = readBlockLength(lastLocatedBlock);
            lastLocatedBlock.getBlock().setNumBytes(readBlockLength);
            j = readBlockLength;
        }
        this.currentNode = null;
        return j;
    }

    private long readBlockLength(LocatedBlock locatedBlock) throws IOException {
        long replicaVisibleLength;
        if (!$assertionsDisabled && locatedBlock == null) {
            throw new AssertionError("LocatedBlock cannot be null");
        }
        int length = locatedBlock.getLocations().length;
        for (DatanodeInfo datanodeInfo : locatedBlock.getLocations()) {
            ClientDatanodeProtocol clientDatanodeProtocol = null;
            try {
                try {
                    clientDatanodeProtocol = DFSUtil.createClientDatanodeProtocolProxy(datanodeInfo, this.dfsClient.conf, this.dfsClient.getConf().socketTimeout, this.dfsClient.getConf().connectToDnViaHostname, locatedBlock);
                    replicaVisibleLength = clientDatanodeProtocol.getReplicaVisibleLength(locatedBlock.getBlock());
                } catch (IOException e) {
                    if ((e instanceof RemoteException) && (((RemoteException) e).unwrapRemoteException() instanceof ReplicaNotFoundException)) {
                        length--;
                    }
                    if (DFSClient.LOG.isDebugEnabled()) {
                        DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode " + datanodeInfo + " for block " + locatedBlock.getBlock(), e);
                    }
                    if (clientDatanodeProtocol != null) {
                        RPC.stopProxy(clientDatanodeProtocol);
                    }
                }
                if (replicaVisibleLength >= 0) {
                    if (clientDatanodeProtocol != null) {
                        RPC.stopProxy(clientDatanodeProtocol);
                    }
                    return replicaVisibleLength;
                }
                if (clientDatanodeProtocol != null) {
                    RPC.stopProxy(clientDatanodeProtocol);
                }
            } catch (Throwable th) {
                if (clientDatanodeProtocol != null) {
                    RPC.stopProxy(clientDatanodeProtocol);
                }
                throw th;
            }
        }
        if (length == 0) {
            return 0L;
        }
        throw new IOException("Cannot obtain block length for " + locatedBlock);
    }

    public synchronized long getFileLength() {
        if (this.locatedBlocks == null) {
            return 0L;
        }
        return this.locatedBlocks.getFileLength() + this.lastBlockBeingWrittenLength;
    }

    private synchronized boolean blockUnderConstruction() {
        return this.locatedBlocks.isUnderConstruction();
    }

    public DatanodeInfo getCurrentDatanode() {
        return this.currentNode;
    }

    public synchronized ExtendedBlock getCurrentBlock() {
        if (this.currentLocatedBlock == null) {
            return null;
        }
        return this.currentLocatedBlock.getBlock();
    }

    public synchronized List<LocatedBlock> getAllBlocks() throws IOException {
        return getBlockRange(0L, getFileLength());
    }

    private synchronized LocatedBlock getBlockAt(long j, boolean z) throws IOException {
        LocatedBlock locatedBlock;
        if (!$assertionsDisabled && this.locatedBlocks == null) {
            throw new AssertionError("locatedBlocks is null");
        }
        if (j < 0 || j >= getFileLength()) {
            throw new IOException("offset < 0 || offset > getFileLength(), offset=" + j + ", updatePosition=" + z + ", locatedBlocks=" + this.locatedBlocks);
        }
        if (j >= this.locatedBlocks.getFileLength()) {
            locatedBlock = this.locatedBlocks.getLastLocatedBlock();
        } else {
            int findBlock = this.locatedBlocks.findBlock(j);
            if (findBlock < 0) {
                findBlock = LocatedBlocks.getInsertIndex(findBlock);
                LocatedBlocks locatedBlocks = this.dfsClient.getLocatedBlocks(this.src, j, this.prefetchSize);
                if (!$assertionsDisabled && locatedBlocks == null) {
                    throw new AssertionError("Could not find target position " + j);
                }
                this.locatedBlocks.insertRange(findBlock, locatedBlocks.getLocatedBlocks());
            }
            locatedBlock = this.locatedBlocks.get(findBlock);
        }
        if (z) {
            this.pos = j;
            this.blockEnd = (locatedBlock.getStartOffset() + locatedBlock.getBlockSize()) - 1;
            this.currentLocatedBlock = locatedBlock;
        }
        return locatedBlock;
    }

    private synchronized void fetchBlockAt(long j) throws IOException {
        int findBlock = this.locatedBlocks.findBlock(j);
        if (findBlock < 0) {
            findBlock = LocatedBlocks.getInsertIndex(findBlock);
        }
        LocatedBlocks locatedBlocks = this.dfsClient.getLocatedBlocks(this.src, j, this.prefetchSize);
        if (locatedBlocks == null) {
            throw new IOException("Could not find target position " + j);
        }
        this.locatedBlocks.insertRange(findBlock, locatedBlocks.getLocatedBlocks());
    }

    private synchronized List<LocatedBlock> getBlockRange(long j, long j2) throws IOException {
        if (j >= getFileLength()) {
            throw new IOException("Offset: " + j + " exceeds file length: " + getFileLength());
        }
        long fileLength = this.locatedBlocks.getFileLength();
        boolean z = j < fileLength;
        boolean z2 = j + j2 > fileLength;
        List<LocatedBlock> finalizedBlockRange = z ? getFinalizedBlockRange(j, Math.min(j2, fileLength - j)) : new ArrayList(1);
        if (z2) {
            finalizedBlockRange.add(this.locatedBlocks.getLastLocatedBlock());
        }
        return finalizedBlockRange;
    }

    private synchronized List<LocatedBlock> getFinalizedBlockRange(long j, long j2) throws IOException {
        if (!$assertionsDisabled && this.locatedBlocks == null) {
            throw new AssertionError("locatedBlocks is null");
        }
        ArrayList arrayList = new ArrayList();
        int findBlock = this.locatedBlocks.findBlock(j);
        if (findBlock < 0) {
            findBlock = LocatedBlocks.getInsertIndex(findBlock);
        }
        long j3 = j2;
        long j4 = j;
        while (j3 > 0) {
            LocatedBlock locatedBlock = null;
            if (findBlock < this.locatedBlocks.locatedBlockCount()) {
                locatedBlock = this.locatedBlocks.get(findBlock);
            }
            if (locatedBlock == null || j4 < locatedBlock.getStartOffset()) {
                this.locatedBlocks.insertRange(findBlock, this.dfsClient.getLocatedBlocks(this.src, j4, j3).getLocatedBlocks());
            } else {
                if (!$assertionsDisabled && j4 < locatedBlock.getStartOffset()) {
                    throw new AssertionError("Block not found");
                }
                arrayList.add(locatedBlock);
                long startOffset = (locatedBlock.getStartOffset() + locatedBlock.getBlockSize()) - j4;
                j3 -= startOffset;
                j4 += startOffset;
                findBlock++;
            }
        }
        return arrayList;
    }

    private synchronized DatanodeInfo blockSeekTo(long j) throws IOException {
        DatanodeInfo datanodeInfo;
        if (j >= getFileLength()) {
            throw new IOException("Attempted to read past end of file");
        }
        if (this.blockReader != null) {
            closeBlockReader(this.blockReader);
            this.blockReader = null;
        }
        int i = 1;
        int i2 = 1;
        boolean z = false;
        while (true) {
            LocatedBlock blockAt = getBlockAt(j, true);
            if (!$assertionsDisabled && j != this.pos) {
                throw new AssertionError("Wrong postion " + this.pos + " expect " + j);
            }
            long startOffset = j - blockAt.getStartOffset();
            DNAddrPair chooseDataNode = chooseDataNode(blockAt);
            datanodeInfo = chooseDataNode.info;
            InetSocketAddress inetSocketAddress = chooseDataNode.addr;
            try {
                ExtendedBlock block = blockAt.getBlock();
                this.blockReader = getBlockReader(inetSocketAddress, datanodeInfo, this.src, block, blockAt.getBlockToken(), startOffset, block.getNumBytes() - startOffset, this.buffersize, this.verifyChecksum, this.dfsClient.clientName);
                if (!z) {
                    break;
                }
                DFSClient.LOG.info("Successfully connected to " + inetSocketAddress + " for " + block);
                break;
            } catch (IOException e) {
                if ((e instanceof InvalidEncryptionKeyException) && i2 > 0) {
                    DFSClient.LOG.info("Will fetch a new encryption key and retry, encryption key was invalid when connecting to " + inetSocketAddress + " : " + e);
                    i2--;
                    this.dfsClient.clearDataEncryptionKey();
                } else if (!(e instanceof InvalidBlockTokenException) || i <= 0) {
                    z = true;
                    DFSClient.LOG.warn("Failed to connect to " + inetSocketAddress + " for block, add to deadNodes and continue. " + e, e);
                    addToDeadNodes(datanodeInfo);
                } else {
                    DFSClient.LOG.info("Will fetch a new access token and retry, access token was invalid when connecting to " + inetSocketAddress + " : " + e);
                    i--;
                    fetchBlockAt(j);
                }
            }
        }
        return datanodeInfo;
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.dfsClient.checkOpen();
        if (this.blockReader != null) {
            closeBlockReader(this.blockReader);
            this.blockReader = null;
        }
        super.close();
        this.closed = true;
    }

    @Override // java.io.InputStream
    public synchronized int read() throws IOException {
        if (read(this.oneByteBuf, 0, 1) <= 0) {
            return -1;
        }
        return this.oneByteBuf[0] & 255;
    }

    /* JADX WARN: Removed duplicated region for block: B:12:0x00ac  */
    /* JADX WARN: Removed duplicated region for block: B:15:0x00d3 A[LOOP:0: B:2:0x0003->B:15:0x00d3, LOOP_END] */
    /* JADX WARN: Removed duplicated region for block: B:16:0x00d0 A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:19:0x00b9  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private synchronized int readBuffer(org.apache.hadoop.hdfs.DFSInputStream.ReaderStrategy r6, int r7, int r8, java.util.Map<org.apache.hadoop.hdfs.protocol.ExtendedBlock, java.util.Set<org.apache.hadoop.hdfs.protocol.DatanodeInfo>> r9) throws java.io.IOException {
        /*
            r5 = this;
            r0 = 1
            r11 = r0
        L3:
            r0 = r6
            r1 = r5
            org.apache.hadoop.hdfs.BlockReader r1 = r1.blockReader     // Catch: org.apache.hadoop.fs.ChecksumException -> L10 java.io.IOException -> L61
            r2 = r7
            r3 = r8
            int r0 = r0.doRead(r1, r2, r3)     // Catch: org.apache.hadoop.fs.ChecksumException -> L10 java.io.IOException -> L61
            return r0
        L10:
            r12 = move-exception
            org.apache.commons.logging.Log r0 = org.apache.hadoop.hdfs.DFSClient.LOG
            java.lang.StringBuilder r1 = new java.lang.StringBuilder
            r2 = r1
            r2.<init>()
            java.lang.String r2 = "Found Checksum error for "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r5
            org.apache.hadoop.hdfs.protocol.ExtendedBlock r2 = r2.getCurrentBlock()
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r2 = " from "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r5
            org.apache.hadoop.hdfs.protocol.DatanodeInfo r2 = r2.currentNode
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r2 = " at "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r12
            long r2 = r2.getPos()
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r1 = r1.toString()
            r0.warn(r1)
            r0 = r12
            r10 = r0
            r0 = 0
            r11 = r0
            r0 = r5
            r1 = r5
            org.apache.hadoop.hdfs.protocol.ExtendedBlock r1 = r1.getCurrentBlock()
            r2 = r5
            org.apache.hadoop.hdfs.protocol.DatanodeInfo r2 = r2.currentNode
            r3 = r9
            r0.addIntoCorruptedBlockMap(r1, r2, r3)
            goto La4
        L61:
            r12 = move-exception
            r0 = r11
            if (r0 != 0) goto La0
            org.apache.commons.logging.Log r0 = org.apache.hadoop.hdfs.DFSClient.LOG
            java.lang.StringBuilder r1 = new java.lang.StringBuilder
            r2 = r1
            r2.<init>()
            java.lang.String r2 = "Exception while reading from "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r5
            org.apache.hadoop.hdfs.protocol.ExtendedBlock r2 = r2.getCurrentBlock()
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r2 = " of "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r5
            java.lang.String r2 = r2.src
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r2 = " from "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r5
            org.apache.hadoop.hdfs.protocol.DatanodeInfo r2 = r2.currentNode
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r1 = r1.toString()
            r2 = r12
            r0.warn(r1, r2)
        La0:
            r0 = r12
            r10 = r0
        La4:
            r0 = 0
            r12 = r0
            r0 = r11
            if (r0 == 0) goto Lb9
            r0 = r5
            r1 = r5
            long r1 = r1.pos
            boolean r0 = r0.seekToBlockSource(r1)
            r12 = r0
            goto Lcb
        Lb9:
            r0 = r5
            r1 = r5
            org.apache.hadoop.hdfs.protocol.DatanodeInfo r1 = r1.currentNode
            r0.addToDeadNodes(r1)
            r0 = r5
            r1 = r5
            long r1 = r1.pos
            boolean r0 = r0.seekToNewSource(r1)
            r12 = r0
        Lcb:
            r0 = r12
            if (r0 != 0) goto Ld3
            r0 = r10
            throw r0
        Ld3:
            r0 = 0
            r11 = r0
            goto L3
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.hadoop.hdfs.DFSInputStream.readBuffer(org.apache.hadoop.hdfs.DFSInputStream$ReaderStrategy, int, int, java.util.Map):int");
    }

    private int readWithStrategy(ReaderStrategy readerStrategy, int i, int i2) throws IOException {
        this.dfsClient.checkOpen();
        if (this.closed) {
            throw new IOException("Stream closed");
        }
        HashMap hashMap = new HashMap();
        this.failures = 0;
        if (this.pos >= getFileLength()) {
            return -1;
        }
        int i3 = 2;
        while (i3 > 0) {
            try {
                try {
                    if (this.pos > this.blockEnd || this.currentNode == null) {
                        this.currentNode = blockSeekTo(this.pos);
                    }
                    int readBuffer = readBuffer(readerStrategy, i, (int) Math.min(i2, (this.blockEnd - this.pos) + 1), hashMap);
                    if (readBuffer < 0) {
                        throw new IOException("Unexpected EOS from the reader");
                    }
                    this.pos += readBuffer;
                    if (this.dfsClient.stats != null && readBuffer != -1) {
                        this.dfsClient.stats.incrementBytesRead(readBuffer);
                    }
                    reportCheckSumFailure(hashMap, this.currentLocatedBlock.getLocations().length);
                    return readBuffer;
                } catch (ChecksumException e) {
                    throw e;
                }
            } catch (IOException e2) {
                if (i3 == 1) {
                    try {
                        DFSClient.LOG.warn("DFS Read", e2);
                    } catch (Throwable th) {
                        reportCheckSumFailure(hashMap, this.currentLocatedBlock.getLocations().length);
                        throw th;
                    }
                }
                this.blockEnd = -1L;
                if (this.currentNode != null) {
                    addToDeadNodes(this.currentNode);
                }
                i3--;
                if (i3 == 0) {
                    throw e2;
                }
                reportCheckSumFailure(hashMap, this.currentLocatedBlock.getLocations().length);
            }
        }
        return -1;
    }

    @Override // java.io.InputStream
    public synchronized int read(byte[] bArr, int i, int i2) throws IOException {
        return readWithStrategy(new ByteArrayStrategy(bArr), i, i2);
    }

    @Override // org.apache.hadoop.fs.ByteBufferReadable
    public synchronized int read(ByteBuffer byteBuffer) throws IOException {
        return readWithStrategy(new ByteBufferStrategy(byteBuffer), 0, byteBuffer.remaining());
    }

    private void addIntoCorruptedBlockMap(ExtendedBlock extendedBlock, DatanodeInfo datanodeInfo, Map<ExtendedBlock, Set<DatanodeInfo>> map) {
        Set<DatanodeInfo> hashSet = map.containsKey(extendedBlock) ? map.get(extendedBlock) : new HashSet();
        if (hashSet.contains(datanodeInfo)) {
            return;
        }
        hashSet.add(datanodeInfo);
        map.put(extendedBlock, hashSet);
    }

    private DNAddrPair chooseDataNode(LocatedBlock locatedBlock) throws IOException {
        while (true) {
            DatanodeInfo[] locations = locatedBlock.getLocations();
            try {
                DatanodeInfo bestNode = bestNode(locations, this.deadNodes);
                String xferAddr = bestNode.getXferAddr(this.dfsClient.connectToDnViaHostname());
                if (DFSClient.LOG.isDebugEnabled()) {
                    DFSClient.LOG.debug("Connecting to datanode " + xferAddr);
                }
                return new DNAddrPair(bestNode, NetUtils.createSocketAddr(xferAddr));
            } catch (IOException e) {
                String str = locatedBlock.getBlock() + " file=" + this.src;
                if (this.failures >= this.dfsClient.getMaxBlockAcquireFailures()) {
                    throw new BlockMissingException(this.src, "Could not obtain block: " + str, locatedBlock.getStartOffset());
                }
                if (locations == null || locations.length == 0) {
                    DFSClient.LOG.info("No node available for " + str);
                }
                DFSClient.LOG.info("Could not obtain " + locatedBlock.getBlock() + " from any node: " + e + ". Will get new block locations from namenode and retry...");
                try {
                    double nextDouble = (this.timeWindow * this.failures) + (this.timeWindow * (this.failures + 1) * DFSUtil.getRandom().nextDouble());
                    DFSClient.LOG.warn("DFS chooseDataNode: got # " + (this.failures + 1) + " IOException, will wait for " + nextDouble + " msec.");
                    Thread.sleep((long) nextDouble);
                } catch (InterruptedException e2) {
                }
                this.deadNodes.clear();
                openInfo();
                locatedBlock = getBlockAt(locatedBlock.getStartOffset(), false);
                this.failures++;
            }
        }
    }

    private void fetchBlockByteRange(LocatedBlock locatedBlock, long j, long j2, byte[] bArr, int i, Map<ExtendedBlock, Set<DatanodeInfo>> map) throws IOException {
        int i2 = 1;
        int i3 = 1;
        while (true) {
            locatedBlock = getBlockAt(locatedBlock.getStartOffset(), false);
            DNAddrPair chooseDataNode = chooseDataNode(locatedBlock);
            DatanodeInfo datanodeInfo = chooseDataNode.info;
            InetSocketAddress inetSocketAddress = chooseDataNode.addr;
            BlockReader blockReader = null;
            try {
                try {
                    int i4 = (int) ((j2 - j) + 1);
                    blockReader = getBlockReader(inetSocketAddress, datanodeInfo, this.src, locatedBlock.getBlock(), locatedBlock.getBlockToken(), j, i4, this.buffersize, this.verifyChecksum, this.dfsClient.clientName);
                    int readAll = blockReader.readAll(bArr, i, i4);
                    if (readAll != i4) {
                        throw new IOException("truncated return from reader.read(): excpected " + i4 + ", got " + readAll);
                        break;
                    } else {
                        if (blockReader != null) {
                            closeBlockReader(blockReader);
                            return;
                        }
                        return;
                    }
                } catch (ChecksumException e) {
                    try {
                        DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception for " + this.src + " at " + locatedBlock.getBlock() + ":" + e.getPos() + " from " + datanodeInfo);
                        addIntoCorruptedBlockMap(locatedBlock.getBlock(), datanodeInfo, map);
                        if (blockReader != null) {
                            closeBlockReader(blockReader);
                        }
                        addToDeadNodes(datanodeInfo);
                    } catch (Throwable th) {
                        if (blockReader != null) {
                            closeBlockReader(blockReader);
                        }
                        throw th;
                    }
                }
            } catch (AccessControlException e2) {
                DFSClient.LOG.warn("Short circuit access failed ", e2);
                this.dfsClient.disableShortCircuit();
                if (blockReader != null) {
                    closeBlockReader(blockReader);
                }
            } catch (IOException e3) {
                if ((e3 instanceof InvalidEncryptionKeyException) && i3 > 0) {
                    DFSClient.LOG.info("Will fetch a new encryption key and retry, encryption key was invalid when connecting to " + inetSocketAddress + " : " + e3);
                    i3--;
                    this.dfsClient.clearDataEncryptionKey();
                } else if (!(e3 instanceof InvalidBlockTokenException) || i2 <= 0) {
                    DFSClient.LOG.warn("Failed to connect to " + inetSocketAddress + " for file " + this.src + " for block " + locatedBlock.getBlock() + ":" + e3);
                    if (DFSClient.LOG.isDebugEnabled()) {
                        DFSClient.LOG.debug("Connection failure ", e3);
                    }
                } else {
                    DFSClient.LOG.info("Will get a new access token and retry, access token was invalid when connecting to " + inetSocketAddress + " : " + e3);
                    i2--;
                    fetchBlockAt(locatedBlock.getStartOffset());
                    if (blockReader != null) {
                        closeBlockReader(blockReader);
                    }
                }
                if (blockReader != null) {
                    closeBlockReader(blockReader);
                }
                addToDeadNodes(datanodeInfo);
            }
        }
    }

    private void closeBlockReader(BlockReader blockReader) throws IOException {
        if (blockReader.hasSentStatusCode()) {
            IOStreamPair streams = blockReader.getStreams();
            this.socketCache.put(blockReader.takeSocket(), streams);
        }
        blockReader.close();
    }

    protected BlockReader getBlockReader(InetSocketAddress inetSocketAddress, DatanodeInfo datanodeInfo, String str, ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token, long j, long j2, int i, boolean z, String str2) throws IOException {
        Socket socket;
        if (this.dfsClient.shouldTryShortCircuitRead(inetSocketAddress) && !blockUnderConstruction()) {
            return DFSClient.getLocalBlockReader(this.dfsClient.conf, this.src, extendedBlock, token, datanodeInfo, this.dfsClient.hdfsTimeout, j, this.dfsClient.connectToDnViaHostname());
        }
        IOException iOException = null;
        boolean z2 = true;
        for (int i2 = 0; i2 <= this.nCachedConnRetry && z2; i2++) {
            SocketCache.SocketAndStreams socketAndStreams = i2 < this.nCachedConnRetry ? this.socketCache.get(inetSocketAddress) : null;
            if (socketAndStreams == null) {
                z2 = false;
                socket = this.dfsClient.socketFactory.createSocket();
                socket.setTcpNoDelay(true);
                NetUtils.connect(socket, inetSocketAddress, this.dfsClient.getRandomLocalInterfaceAddr(), this.dfsClient.getConf().socketTimeout);
                socket.setSoTimeout(this.dfsClient.getConf().socketTimeout);
            } else {
                socket = socketAndStreams.sock;
            }
            try {
                return BlockReaderFactory.newBlockReader(this.dfsClient.getConf(), socket, str, extendedBlock, token, j, j2, i, z, str2, this.dfsClient.getDataEncryptionKey(), socketAndStreams == null ? null : socketAndStreams.ioStreams);
            } catch (IOException e) {
                DFSClient.LOG.debug("Error making BlockReader. Closing stale " + socket, e);
                if (socketAndStreams != null) {
                    socketAndStreams.close();
                } else {
                    socket.close();
                }
                iOException = e;
            }
        }
        throw iOException;
    }

    @Override // org.apache.hadoop.fs.FSInputStream, org.apache.hadoop.fs.PositionedReadable
    public int read(long j, byte[] bArr, int i, int i2) throws IOException {
        this.dfsClient.checkOpen();
        if (this.closed) {
            throw new IOException("Stream closed");
        }
        this.failures = 0;
        long fileLength = getFileLength();
        if (j < 0 || j >= fileLength) {
            return -1;
        }
        int i3 = i2;
        if (j + i2 > fileLength) {
            i3 = (int) (fileLength - j);
        }
        List<LocatedBlock> blockRange = getBlockRange(j, i3);
        int i4 = i3;
        HashMap hashMap = new HashMap();
        for (LocatedBlock locatedBlock : blockRange) {
            long startOffset = j - locatedBlock.getStartOffset();
            long min = Math.min(i4, locatedBlock.getBlockSize() - startOffset);
            try {
                fetchBlockByteRange(locatedBlock, startOffset, (startOffset + min) - 1, bArr, i, hashMap);
                reportCheckSumFailure(hashMap, locatedBlock.getLocations().length);
                i4 = (int) (i4 - min);
                j += min;
                i = (int) (i + min);
            } catch (Throwable th) {
                reportCheckSumFailure(hashMap, locatedBlock.getLocations().length);
                throw th;
            }
        }
        if (!$assertionsDisabled && i4 != 0) {
            throw new AssertionError("Wrong number of bytes read.");
        }
        if (this.dfsClient.stats != null) {
            this.dfsClient.stats.incrementBytesRead(i3);
        }
        return i3;
    }

    private void reportCheckSumFailure(Map<ExtendedBlock, Set<DatanodeInfo>> map, int i) {
        if (map.isEmpty()) {
            return;
        }
        Map.Entry<ExtendedBlock, Set<DatanodeInfo>> next = map.entrySet().iterator().next();
        ExtendedBlock key = next.getKey();
        Set<DatanodeInfo> value = next.getValue();
        if ((value.size() < i && value.size() > 0) || (i == 1 && value.size() == i)) {
            DatanodeInfo[] datanodeInfoArr = new DatanodeInfo[value.size()];
            int i2 = 0;
            Iterator<DatanodeInfo> it = value.iterator();
            while (it.hasNext()) {
                int i3 = i2;
                i2++;
                datanodeInfoArr[i3] = it.next();
            }
            this.dfsClient.reportChecksumFailure(this.src, new LocatedBlock[]{new LocatedBlock(key, datanodeInfoArr)});
        }
        map.clear();
    }

    @Override // java.io.InputStream
    public long skip(long j) throws IOException {
        if (j <= 0) {
            return j < 0 ? -1L : 0L;
        }
        long pos = getPos();
        long fileLength = getFileLength();
        if (j + pos > fileLength) {
            j = fileLength - pos;
        }
        seek(pos + j);
        return j;
    }

    @Override // org.apache.hadoop.fs.FSInputStream, org.apache.hadoop.fs.Seekable
    public synchronized void seek(long j) throws IOException {
        int i;
        if (j > getFileLength()) {
            throw new IOException("Cannot seek after EOF");
        }
        if (j < 0) {
            throw new IOException("Cannot seek to negative offset");
        }
        if (this.closed) {
            throw new IOException("Stream is closed!");
        }
        boolean z = false;
        if (this.pos <= j && j <= this.blockEnd && (i = (int) (j - this.pos)) <= 131072) {
            try {
                this.pos += this.blockReader.skip(i);
                if (this.pos == j) {
                    z = true;
                }
            } catch (IOException e) {
                if (DFSClient.LOG.isDebugEnabled()) {
                    DFSClient.LOG.debug("Exception while seek to " + j + " from " + getCurrentBlock() + " of " + this.src + " from " + this.currentNode, e);
                }
            }
        }
        if (z) {
            return;
        }
        this.pos = j;
        this.blockEnd = -1L;
    }

    private synchronized boolean seekToBlockSource(long j) throws IOException {
        this.currentNode = blockSeekTo(j);
        return true;
    }

    @Override // org.apache.hadoop.fs.FSInputStream, org.apache.hadoop.fs.Seekable
    public synchronized boolean seekToNewSource(long j) throws IOException {
        boolean containsKey = this.deadNodes.containsKey(this.currentNode);
        addToDeadNodes(this.currentNode);
        DatanodeInfo datanodeInfo = this.currentNode;
        DatanodeInfo blockSeekTo = blockSeekTo(j);
        if (!containsKey) {
            this.deadNodes.remove(datanodeInfo);
        }
        if (datanodeInfo.getStorageID().equals(blockSeekTo.getStorageID())) {
            return false;
        }
        this.currentNode = blockSeekTo;
        return true;
    }

    @Override // org.apache.hadoop.fs.FSInputStream, org.apache.hadoop.fs.Seekable
    public synchronized long getPos() throws IOException {
        return this.pos;
    }

    @Override // java.io.InputStream
    public synchronized int available() throws IOException {
        if (this.closed) {
            throw new IOException("Stream closed");
        }
        long fileLength = getFileLength() - this.pos;
        return fileLength <= 2147483647L ? (int) fileLength : Priority.OFF_INT;
    }

    @Override // java.io.InputStream
    public boolean markSupported() {
        return false;
    }

    @Override // java.io.InputStream
    public void mark(int i) {
    }

    @Override // java.io.InputStream
    public void reset() throws IOException {
        throw new IOException("Mark/reset not supported");
    }

    static DatanodeInfo bestNode(DatanodeInfo[] datanodeInfoArr, AbstractMap<DatanodeInfo, DatanodeInfo> abstractMap) throws IOException {
        if (datanodeInfoArr != null) {
            for (int i = 0; i < datanodeInfoArr.length; i++) {
                if (!abstractMap.containsKey(datanodeInfoArr[i])) {
                    return datanodeInfoArr[i];
                }
            }
        }
        throw new IOException("No live nodes contain current block");
    }

    static {
        $assertionsDisabled = !DFSInputStream.class.desiredAssertionStatus();
    }
}
