package org.apache.hadoop.hdfs;

import com.google.common.util.concurrent.Uninterruptibles;
import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.ftp.FtpConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:lib/hadoop-hdfs-2.6.5-tests.jar:org/apache/hadoop/hdfs/TestBlockReaderFactory.class */
public class TestBlockReaderFactory {
    static final Log LOG = LogFactory.getLog(TestBlockReaderFactory.class);

    @Before
    public void init() {
        DomainSocket.disableBindPathValidation();
        Assume.assumeThat(DomainSocket.getLoadingFailureReason(), CoreMatchers.equalTo(null));
    }

    @After
    public void cleanup() {
        DFSInputStream.tcpReadsDisabledForTesting = false;
        BlockReaderFactory.createShortCircuitReplicaInfoCallback = null;
    }

    public static Configuration createShortCircuitConf(String str, TemporarySocketDirectory temporarySocketDirectory) {
        Configuration configuration = new Configuration();
        configuration.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, str);
        configuration.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, FtpConfigKeys.BLOCK_SIZE_DEFAULT);
        configuration.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, new File(temporarySocketDirectory.getDir(), str + "._PORT").getAbsolutePath());
        configuration.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
        configuration.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
        configuration.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
        return configuration;
    }

    @Test(timeout = 60000)
    public void testFallbackFromShortCircuitToUnixDomainTraffic() throws Exception {
        DFSInputStream.tcpReadsDisabledForTesting = true;
        TemporarySocketDirectory temporarySocketDirectory = new TemporarySocketDirectory();
        Configuration createShortCircuitConf = createShortCircuitConf("testFallbackFromShortCircuitToUnixDomainTraffic", temporarySocketDirectory);
        createShortCircuitConf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, "testFallbackFromShortCircuitToUnixDomainTraffic_clientContext");
        createShortCircuitConf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
        Configuration configuration = new Configuration(createShortCircuitConf);
        configuration.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
        MiniDFSCluster build = new MiniDFSCluster.Builder(configuration).numDataNodes(1).build();
        build.waitActive();
        FileSystem fileSystem = FileSystem.get(build.getURI(0), createShortCircuitConf);
        DFSTestUtil.createFile(fileSystem, new Path("/test_file"), 8193L, (short) 1, 1027565L);
        Assert.assertTrue(Arrays.equals(DFSTestUtil.readFileBuffer(fileSystem, new Path("/test_file")), DFSTestUtil.calculateFileContentsFromSeed(1027565L, 8193)));
        build.shutdown();
        temporarySocketDirectory.close();
    }

    @Test(timeout = 60000)
    public void testMultipleWaitersOnShortCircuitCache() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        DFSInputStream.tcpReadsDisabledForTesting = true;
        BlockReaderFactory.createShortCircuitReplicaInfoCallback = new ShortCircuitCache.ShortCircuitReplicaCreator() { // from class: org.apache.hadoop.hdfs.TestBlockReaderFactory.1
            @Override // org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator
            public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
                Uninterruptibles.awaitUninterruptibly(countDownLatch);
                if (atomicBoolean.compareAndSet(true, false)) {
                    return null;
                }
                Assert.fail("there were multiple calls to createShortCircuitReplicaInfo.  Only one was expected.");
                return null;
            }
        };
        TemporarySocketDirectory temporarySocketDirectory = new TemporarySocketDirectory();
        MiniDFSCluster build = new MiniDFSCluster.Builder(createShortCircuitConf("testMultipleWaitersOnShortCircuitCache", temporarySocketDirectory)).numDataNodes(1).build();
        build.waitActive();
        final DistributedFileSystem fileSystem = build.getFileSystem();
        DFSTestUtil.createFile(fileSystem, new Path("/test_file"), 4000L, (short) 1, 1027565L);
        Runnable runnable = new Runnable() { // from class: org.apache.hadoop.hdfs.TestBlockReaderFactory.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    byte[] readFileBuffer = DFSTestUtil.readFileBuffer(fileSystem, new Path("/test_file"));
                    Assert.assertFalse(atomicBoolean.get());
                    Assert.assertTrue(Arrays.equals(readFileBuffer, DFSTestUtil.calculateFileContentsFromSeed(1027565L, 4000)));
                } catch (Throwable th) {
                    TestBlockReaderFactory.LOG.error("readerRunnable error", th);
                    atomicBoolean2.set(true);
                }
            }
        };
        Thread[] threadArr = new Thread[10];
        for (int i = 0; i < 10; i++) {
            threadArr[i] = new Thread(runnable);
            threadArr[i].start();
        }
        Thread.sleep(500L);
        countDownLatch.countDown();
        for (int i2 = 0; i2 < 10; i2++) {
            Uninterruptibles.joinUninterruptibly(threadArr[i2]);
        }
        build.shutdown();
        temporarySocketDirectory.close();
        Assert.assertFalse(atomicBoolean2.get());
    }

    @Test(timeout = 60000)
    public void testShortCircuitCacheTemporaryFailure() throws Exception {
        BlockReaderTestUtil.enableBlockReaderFactoryTracing();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        DFSInputStream.tcpReadsDisabledForTesting = true;
        BlockReaderFactory.createShortCircuitReplicaInfoCallback = new ShortCircuitCache.ShortCircuitReplicaCreator() { // from class: org.apache.hadoop.hdfs.TestBlockReaderFactory.3
            @Override // org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator
            public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
                if (!atomicBoolean.get()) {
                    return null;
                }
                Uninterruptibles.sleepUninterruptibly(2L, TimeUnit.SECONDS);
                return new ShortCircuitReplicaInfo();
            }
        };
        TemporarySocketDirectory temporarySocketDirectory = new TemporarySocketDirectory();
        final MiniDFSCluster build = new MiniDFSCluster.Builder(createShortCircuitConf("testShortCircuitCacheTemporaryFailure", temporarySocketDirectory)).numDataNodes(1).build();
        build.waitActive();
        DistributedFileSystem fileSystem = build.getFileSystem();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        DFSTestUtil.createFile(fileSystem, new Path("/test_file"), 4000L, (short) 1, 1027565L);
        Runnable runnable = new Runnable() { // from class: org.apache.hadoop.hdfs.TestBlockReaderFactory.4
            /* JADX WARN: Finally extract failed */
            @Override // java.lang.Runnable
            public void run() {
                try {
                    LocatedBlock locatedBlock = build.getNameNode().getRpcServer().getBlockLocations("/test_file", 0L, 4000L).getLocatedBlocks().get(0);
                    BlockReader blockReader = null;
                    try {
                        try {
                            blockReader = BlockReaderTestUtil.getBlockReader(build, locatedBlock, 0, 4000);
                            Assert.fail("expected getBlockReader to fail the first time.");
                            if (blockReader != null) {
                                blockReader.close();
                            }
                        } finally {
                            if (blockReader != null) {
                                blockReader.close();
                            }
                        }
                    } catch (Throwable th) {
                        Assert.assertTrue("expected to see 'TCP reads were disabled for testing' in exception " + th, th.getMessage().contains("TCP reads were disabled for testing"));
                        if (blockReader != null) {
                            blockReader.close();
                        }
                    }
                    countDownLatch.countDown();
                    countDownLatch2.await();
                    try {
                        try {
                            blockReader = BlockReaderTestUtil.getBlockReader(build, locatedBlock, 0, 4000);
                            if (blockReader != null) {
                                blockReader.close();
                            }
                        } catch (Throwable th2) {
                            TestBlockReaderFactory.LOG.error("error trying to retrieve a block reader the second time.", th2);
                            throw th2;
                        }
                    } catch (Throwable th3) {
                        throw th3;
                    }
                } catch (Throwable th4) {
                    TestBlockReaderFactory.LOG.error("getBlockReader failure", th4);
                    atomicBoolean2.set(true);
                }
            }
        };
        Thread[] threadArr = new Thread[2];
        for (int i = 0; i < 2; i++) {
            threadArr[i] = new Thread(runnable);
            threadArr[i].start();
        }
        countDownLatch.await();
        atomicBoolean.set(false);
        countDownLatch2.countDown();
        for (int i2 = 0; i2 < 2; i2++) {
            Uninterruptibles.joinUninterruptibly(threadArr[i2]);
        }
        build.shutdown();
        temporarySocketDirectory.close();
        Assert.assertFalse(atomicBoolean2.get());
    }

    @Test
    public void testShortCircuitReadFromServerWithoutShm() throws Exception {
        Configuration createShortCircuitConf = createShortCircuitConf("testShortCircuitReadFromServerWithoutShm", new TemporarySocketDirectory());
        Configuration configuration = new Configuration(createShortCircuitConf);
        configuration.setInt(DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
        DFSInputStream.tcpReadsDisabledForTesting = true;
        MiniDFSCluster build = new MiniDFSCluster.Builder(configuration).numDataNodes(1).build();
        build.waitActive();
        createShortCircuitConf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, "testShortCircuitReadFromServerWithoutShm_clientContext");
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(build.getURI(0), createShortCircuitConf);
        DFSTestUtil.createFile(distributedFileSystem, new Path("/test_file"), 4000L, (short) 1, 1027564L);
        Assert.assertTrue(Arrays.equals(DFSTestUtil.readFileBuffer(distributedFileSystem, new Path("/test_file")), DFSTestUtil.calculateFileContentsFromSeed(1027564L, 4000)));
        ShortCircuitCache shortCircuitCache = distributedFileSystem.dfs.getClientContext().getShortCircuitCache();
        final DatanodeInfo datanodeInfo = new DatanodeInfo(build.getDataNodes().get(0).getDatanodeId());
        shortCircuitCache.getDfsClientShmManager().visit(new DfsClientShmManager.Visitor() { // from class: org.apache.hadoop.hdfs.TestBlockReaderFactory.5
            @Override // org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor
            public void visit(HashMap<DatanodeInfo, DfsClientShmManager.PerDatanodeVisitorInfo> hashMap) throws IOException {
                Assert.assertEquals(1L, hashMap.size());
                Assert.assertTrue(hashMap.get(datanodeInfo).disabled);
                Assert.assertEquals(0L, r0.full.size());
                Assert.assertEquals(0L, r0.notFull.size());
            }
        });
        build.shutdown();
    }

    @Test
    public void testShortCircuitReadFromClientWithoutShm() throws Exception {
        Configuration createShortCircuitConf = createShortCircuitConf("testShortCircuitReadWithoutShm", new TemporarySocketDirectory());
        Configuration configuration = new Configuration(createShortCircuitConf);
        DFSInputStream.tcpReadsDisabledForTesting = true;
        MiniDFSCluster build = new MiniDFSCluster.Builder(configuration).numDataNodes(1).build();
        build.waitActive();
        createShortCircuitConf.setInt(DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
        createShortCircuitConf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, "testShortCircuitReadFromClientWithoutShm_clientContext");
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(build.getURI(0), createShortCircuitConf);
        DFSTestUtil.createFile(distributedFileSystem, new Path("/test_file"), 4000L, (short) 1, 1027564L);
        Assert.assertTrue(Arrays.equals(DFSTestUtil.readFileBuffer(distributedFileSystem, new Path("/test_file")), DFSTestUtil.calculateFileContentsFromSeed(1027564L, 4000)));
        Assert.assertEquals((Object) null, distributedFileSystem.dfs.getClientContext().getShortCircuitCache().getDfsClientShmManager());
        build.shutdown();
    }

    @Test
    public void testShortCircuitCacheShutdown() throws Exception {
        Configuration createShortCircuitConf = createShortCircuitConf("testShortCircuitCacheShutdown", new TemporarySocketDirectory());
        createShortCircuitConf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown");
        Configuration configuration = new Configuration(createShortCircuitConf);
        DFSInputStream.tcpReadsDisabledForTesting = true;
        MiniDFSCluster build = new MiniDFSCluster.Builder(configuration).numDataNodes(1).build();
        build.waitActive();
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(build.getURI(0), createShortCircuitConf);
        DFSTestUtil.createFile(distributedFileSystem, new Path("/test_file"), 4000L, (short) 1, 1027564L);
        Assert.assertTrue(Arrays.equals(DFSTestUtil.readFileBuffer(distributedFileSystem, new Path("/test_file")), DFSTestUtil.calculateFileContentsFromSeed(1027564L, 4000)));
        ShortCircuitCache shortCircuitCache = distributedFileSystem.dfs.getClientContext().getShortCircuitCache();
        shortCircuitCache.close();
        Assert.assertTrue(shortCircuitCache.getDfsClientShmManager().getDomainSocketWatcher().isClosed());
        build.shutdown();
    }

    @Test(timeout = YarnConfiguration.DEFAULT_NM_DISK_HEALTH_CHECK_INTERVAL_MS)
    public void testPurgingClosedReplicas() throws Exception {
        BlockReaderTestUtil.enableBlockReaderFactoryTracing();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        DFSInputStream.tcpReadsDisabledForTesting = true;
        BlockReaderFactory.createShortCircuitReplicaInfoCallback = new ShortCircuitCache.ShortCircuitReplicaCreator() { // from class: org.apache.hadoop.hdfs.TestBlockReaderFactory.6
            @Override // org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator
            public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
                atomicInteger.incrementAndGet();
                return null;
            }
        };
        TemporarySocketDirectory temporarySocketDirectory = new TemporarySocketDirectory();
        Configuration createShortCircuitConf = createShortCircuitConf("testPurgingClosedReplicas", temporarySocketDirectory);
        final MiniDFSCluster build = new MiniDFSCluster.Builder(createShortCircuitConf).numDataNodes(1).build();
        build.waitActive();
        DistributedFileSystem fileSystem = build.getFileSystem();
        DFSTestUtil.createFile((DistributedFileSystem) FileSystem.get(build.getURI(0), createShortCircuitConf), new Path("/test_file"), 4095L, (short) 1, 1027552L);
        final Semaphore semaphore = new Semaphore(0);
        final LocatedBlock locatedBlock = build.getNameNode().getRpcServer().getBlockLocations("/test_file", 0L, 4095L).getLocatedBlocks().get(0);
        final byte[] bArr = new byte[4095];
        Thread thread = new Thread(new Runnable() { // from class: org.apache.hadoop.hdfs.TestBlockReaderFactory.7
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    BlockReader blockReader = null;
                    try {
                        try {
                            try {
                                blockReader = BlockReaderTestUtil.getBlockReader(build, locatedBlock, 0, 4095);
                                semaphore.release();
                                try {
                                    blockReader.readAll(bArr, 0, 4095);
                                    semaphore.acquireUninterruptibly();
                                    if (blockReader != null) {
                                        blockReader.close();
                                    }
                                    TestBlockReaderFactory.LOG.info("read another 4095 bytes.");
                                } catch (Throwable th) {
                                    semaphore.acquireUninterruptibly();
                                    throw th;
                                }
                            } catch (ClosedByInterruptException e) {
                                TestBlockReaderFactory.LOG.info("got the expected ClosedByInterruptException", e);
                                semaphore.release();
                                if (blockReader != null) {
                                    blockReader.close();
                                }
                                return;
                            }
                        } catch (Throwable th2) {
                            if (blockReader != null) {
                                blockReader.close();
                            }
                            throw th2;
                        }
                    } catch (Throwable th3) {
                        TestBlockReaderFactory.LOG.error("getBlockReader failure", th3);
                        atomicBoolean.set(true);
                        semaphore.release();
                        return;
                    }
                }
            }
        });
        thread.start();
        while (thread.isAlive()) {
            semaphore.acquireUninterruptibly();
            thread.interrupt();
            semaphore.release();
        }
        Assert.assertFalse(atomicBoolean.get());
        BlockReader blockReader = null;
        try {
            blockReader = BlockReaderTestUtil.getBlockReader(build, locatedBlock, 0, 4095);
            blockReader.readFully(bArr, 0, 4095);
            if (blockReader != null) {
                blockReader.close();
            }
            Assert.assertTrue(Arrays.equals(bArr, DFSTestUtil.calculateFileContentsFromSeed(1027552L, 4095)));
            Assert.assertEquals(2L, atomicInteger.get());
            fileSystem.close();
            build.shutdown();
            temporarySocketDirectory.close();
        } catch (Throwable th) {
            if (blockReader != null) {
                blockReader.close();
            }
            throw th;
        }
    }
}
