package org.apache.hadoop.hdfs.server.namenode;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.JournalSet;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.class */
public class TestEditLogRace {
    private static final Log LOG;
    private static final String NAME_DIR;
    static final int NUM_THREADS = 16;
    static final int NUM_ROLLS = 30;
    static final int NUM_SAVE_IMAGE = 30;
    private final List<Transactions> workers = new ArrayList();
    private static final int NUM_DATA_NODES = 1;
    private static final int BLOCK_TIME = 10;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/namenode/TestEditLogRace$Transactions.class */
    public static class Transactions implements Runnable {
        final NamenodeProtocols nn;
        short replication = 3;
        long blockSize = 64;
        volatile boolean stopped = false;
        volatile Thread thr;
        final AtomicReference<Throwable> caught;

        Transactions(NamenodeProtocols namenodeProtocols, AtomicReference<Throwable> atomicReference) {
            this.nn = namenodeProtocols;
            this.caught = atomicReference;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.thr = Thread.currentThread();
            FsPermission fsPermission = new FsPermission((short) 511);
            int i = 0;
            while (!this.stopped) {
                try {
                    String str = "/thr-" + this.thr.getId() + "-dir-" + i;
                    this.nn.mkdirs(str, fsPermission, true);
                    this.nn.delete(str, true);
                } catch (SafeModeException e) {
                } catch (Throwable th) {
                    TestEditLogRace.LOG.warn("Got error in transaction thread", th);
                    this.caught.compareAndSet(null, th);
                    return;
                }
                i++;
            }
        }

        public void stop() {
            this.stopped = true;
        }

        public Thread getThread() {
            return this.thr;
        }
    }

    private void startTransactionWorkers(NamenodeProtocols namenodeProtocols, AtomicReference<Throwable> atomicReference) {
        for (int i = 0; i < 16; i++) {
            Transactions transactions = new Transactions(namenodeProtocols, atomicReference);
            new Thread(transactions, "TransactionThread-" + i).start();
            this.workers.add(transactions);
        }
    }

    private void stopTransactionWorkers() {
        Iterator<Transactions> it = this.workers.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        Iterator<Transactions> it2 = this.workers.iterator();
        while (it2.hasNext()) {
            Thread thread = it2.next().getThread();
            if (thread != null) {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                }
            }
        }
    }

    @Test
    public void testEditLogRolling() throws Exception {
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        MiniDFSCluster miniDFSCluster = null;
        DistributedFileSystem distributedFileSystem = null;
        AtomicReference<Throwable> atomicReference = new AtomicReference<>();
        try {
            miniDFSCluster = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(1).build();
            miniDFSCluster.waitActive();
            distributedFileSystem = miniDFSCluster.getFileSystem();
            NamenodeProtocols rpcServer = miniDFSCluster.getNameNode().getRpcServer();
            FSImage fSImage = miniDFSCluster.getNamesystem().getFSImage();
            Storage.StorageDirectory storageDir = fSImage.getStorage().getStorageDir(0);
            startTransactionWorkers(rpcServer, atomicReference);
            long j = 1;
            for (int i = 0; i < 30; i++) {
                if (atomicReference.get() != null) {
                    break;
                }
                try {
                    Thread.sleep(20L);
                } catch (InterruptedException e) {
                }
                LOG.info("Starting roll " + i + ".");
                long j2 = rpcServer.rollEditLog().curSegmentTxId;
                j += verifyEditLogs(miniDFSCluster.getNamesystem(), fSImage, NNStorage.getFinalizedEditsFileName(j, j2 - 1), j);
                Assert.assertEquals(j, j2);
                File inProgressEditsFile = NNStorage.getInProgressEditsFile(storageDir, j);
                Assert.assertTrue("Expect " + inProgressEditsFile + " to exist", inProgressEditsFile.exists());
            }
            stopTransactionWorkers();
            if (atomicReference.get() != null) {
                throw new RuntimeException(atomicReference.get());
            }
            if (distributedFileSystem != null) {
                distributedFileSystem.close();
            }
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
        } catch (Throwable th) {
            stopTransactionWorkers();
            if (atomicReference.get() != null) {
                throw new RuntimeException(atomicReference.get());
            }
            if (distributedFileSystem != null) {
                distributedFileSystem.close();
            }
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            throw th;
        }
    }

    private long verifyEditLogs(FSNamesystem fSNamesystem, FSImage fSImage, String str, long j) throws IOException {
        long j2 = -1;
        Iterator<Storage.StorageDirectory> it = fSImage.getStorage().dirIterable(NNStorage.NameNodeDirType.EDITS).iterator();
        while (it.hasNext()) {
            File file = new File(it.next().getCurrentDir(), str);
            System.out.println("Verifying file: " + file);
            long loadFSEdits = new FSEditLogLoader(fSNamesystem, j).loadFSEdits(new EditLogFileInputStream(file), j);
            System.out.println("Number of edits: " + loadFSEdits);
            Assert.assertTrue(j2 == -1 || loadFSEdits == j2);
            j2 = loadFSEdits;
        }
        Assert.assertTrue(j2 != -1);
        return j2;
    }

    @Test
    public void testSaveNamespace() throws Exception {
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        MiniDFSCluster miniDFSCluster = null;
        DistributedFileSystem distributedFileSystem = null;
        AtomicReference<Throwable> atomicReference = new AtomicReference<>();
        try {
            miniDFSCluster = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(1).build();
            miniDFSCluster.waitActive();
            distributedFileSystem = miniDFSCluster.getFileSystem();
            FSNamesystem namesystem = miniDFSCluster.getNamesystem();
            NamenodeProtocols nameNodeRpc = miniDFSCluster.getNameNodeRpc();
            FSImage fSImage = namesystem.getFSImage();
            FSEditLog editLog = fSImage.getEditLog();
            startTransactionWorkers(nameNodeRpc, atomicReference);
            for (int i = 0; i < 30; i++) {
                if (atomicReference.get() != null) {
                    break;
                }
                try {
                    Thread.sleep(20L);
                } catch (InterruptedException e) {
                }
                LOG.info("Save " + i + ": entering safe mode");
                namesystem.enterSafeMode(false);
                long mostRecentCheckpointTxId = fSImage.getStorage().getMostRecentCheckpointTxId() + 1;
                verifyEditLogs(namesystem, fSImage, NNStorage.getInProgressEditsFileName(mostRecentCheckpointTxId), mostRecentCheckpointTxId);
                LOG.info("Save " + i + ": saving namespace");
                namesystem.saveNamespace();
                LOG.info("Save " + i + ": leaving safemode");
                verifyEditLogs(namesystem, fSImage, NNStorage.getFinalizedEditsFileName(mostRecentCheckpointTxId, fSImage.getStorage().getMostRecentCheckpointTxId()), mostRecentCheckpointTxId);
                Assert.assertEquals(fSImage.getStorage().getMostRecentCheckpointTxId(), editLog.getLastWrittenTxId() - 1);
                namesystem.leaveSafeMode();
                LOG.info("Save " + i + ": complete");
            }
            stopTransactionWorkers();
            if (atomicReference.get() != null) {
                throw new RuntimeException(atomicReference.get());
            }
            if (distributedFileSystem != null) {
                distributedFileSystem.close();
            }
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
        } catch (Throwable th) {
            stopTransactionWorkers();
            if (atomicReference.get() != null) {
                throw new RuntimeException(atomicReference.get());
            }
            if (distributedFileSystem != null) {
                distributedFileSystem.close();
            }
            if (miniDFSCluster != null) {
                miniDFSCluster.shutdown();
            }
            throw th;
        }
    }

    private Configuration getConf() {
        HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
        FileSystem.setDefaultUri(hdfsConfiguration, "hdfs://localhost:0");
        hdfsConfiguration.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, YarnConfiguration.DEFAULT_NM_ADDRESS);
        hdfsConfiguration.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR);
        hdfsConfiguration.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR);
        hdfsConfiguration.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
        return hdfsConfiguration;
    }

    @Test
    public void testSaveImageWhileSyncInProgress() throws Exception {
        Configuration conf = getConf();
        NameNode.initMetrics(conf, HdfsServerConstants.NamenodeRole.NAMENODE);
        DFSTestUtil.formatNameNode(conf);
        final FSNamesystem loadFromDisk = FSNamesystem.loadFromDisk(conf);
        try {
            FSImage fSImage = loadFromDisk.getFSImage();
            JournalSet.JournalAndStream journalAndStream = fSImage.getEditLog().getJournals().get(0);
            EditLogFileOutputStream editLogFileOutputStream = (EditLogFileOutputStream) Mockito.spy((EditLogFileOutputStream) journalAndStream.getCurrentStream());
            journalAndStream.setCurrentStreamForTests(editLogFileOutputStream);
            final AtomicReference atomicReference = new AtomicReference();
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final Thread thread = new Thread() { // from class: org.apache.hadoop.hdfs.server.namenode.TestEditLogRace.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        TestEditLogRace.LOG.info("Starting mkdirs");
                        loadFromDisk.mkdirs("/test", new PermissionStatus("test", "test", new FsPermission((short) 493)), true);
                        TestEditLogRace.LOG.info("mkdirs complete");
                    } catch (Throwable th) {
                        TestEditLogRace.LOG.fatal("Got exception", th);
                        atomicReference.set(th);
                        countDownLatch.countDown();
                    }
                }
            };
            ((EditLogFileOutputStream) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.hadoop.hdfs.server.namenode.TestEditLogRace.2
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m6199answer(InvocationOnMock invocationOnMock) throws Throwable {
                    TestEditLogRace.LOG.info("Flush called");
                    if (Thread.currentThread() == thread) {
                        TestEditLogRace.LOG.info("edit thread: Telling main thread we made it to flush section...");
                        countDownLatch.countDown();
                        TestEditLogRace.LOG.info("edit thread: sleeping for 10secs");
                        Thread.sleep(10000L);
                        TestEditLogRace.LOG.info("Going through to flush. This will allow the main thread to continue.");
                    }
                    invocationOnMock.callRealMethod();
                    TestEditLogRace.LOG.info("Flush complete");
                    return null;
                }
            }).when(editLogFileOutputStream)).flush();
            thread.start();
            LOG.info("Main thread: waiting to enter flush...");
            countDownLatch.await();
            Assert.assertNull(atomicReference.get());
            LOG.info("Main thread: detected that logSync is in unsynchronized section.");
            LOG.info("Trying to enter safe mode.");
            LOG.info("This should block for 10sec, since flush will sleep that long");
            long now = Time.now();
            loadFromDisk.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
            long now2 = Time.now();
            LOG.info("Entered safe mode");
            Assert.assertTrue(now2 - now > 9000);
            loadFromDisk.saveNamespace();
            LOG.info("Joining on edit thread...");
            thread.join();
            Assert.assertNull(atomicReference.get());
            Assert.assertEquals(3L, verifyEditLogs(loadFromDisk, fSImage, NNStorage.getFinalizedEditsFileName(1L, 3L), 1L));
            Assert.assertEquals(1L, verifyEditLogs(loadFromDisk, fSImage, NNStorage.getInProgressEditsFileName(4L), 4L));
            LOG.info("Closing nn");
            if (loadFromDisk != null) {
                loadFromDisk.close();
            }
        } catch (Throwable th) {
            LOG.info("Closing nn");
            if (loadFromDisk != null) {
                loadFromDisk.close();
            }
            throw th;
        }
    }

    @Test
    public void testSaveRightBeforeSync() throws Exception {
        Configuration conf = getConf();
        NameNode.initMetrics(conf, HdfsServerConstants.NamenodeRole.NAMENODE);
        DFSTestUtil.formatNameNode(conf);
        final FSNamesystem loadFromDisk = FSNamesystem.loadFromDisk(conf);
        try {
            FSImage fSImage = loadFromDisk.getFSImage();
            FSEditLog fSEditLog = (FSEditLog) Mockito.spy(fSImage.getEditLog());
            DFSTestUtil.setEditLogForTesting(loadFromDisk, fSEditLog);
            final AtomicReference atomicReference = new AtomicReference();
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final Thread thread = new Thread() { // from class: org.apache.hadoop.hdfs.server.namenode.TestEditLogRace.3
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        TestEditLogRace.LOG.info("Starting mkdirs");
                        loadFromDisk.mkdirs("/test", new PermissionStatus("test", "test", new FsPermission((short) 493)), true);
                        TestEditLogRace.LOG.info("mkdirs complete");
                    } catch (Throwable th) {
                        TestEditLogRace.LOG.fatal("Got exception", th);
                        atomicReference.set(th);
                        countDownLatch.countDown();
                    }
                }
            };
            ((FSEditLog) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.hadoop.hdfs.server.namenode.TestEditLogRace.4
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m6200answer(InvocationOnMock invocationOnMock) throws Throwable {
                    TestEditLogRace.LOG.info("logSync called");
                    if (Thread.currentThread() == thread) {
                        TestEditLogRace.LOG.info("edit thread: Telling main thread we made it just before logSync...");
                        countDownLatch.countDown();
                        TestEditLogRace.LOG.info("edit thread: sleeping for 10secs");
                        Thread.sleep(10000L);
                        TestEditLogRace.LOG.info("Going through to logSync. This will allow the main thread to continue.");
                    }
                    invocationOnMock.callRealMethod();
                    TestEditLogRace.LOG.info("logSync complete");
                    return null;
                }
            }).when(fSEditLog)).logSync();
            thread.start();
            LOG.info("Main thread: waiting to just before logSync...");
            countDownLatch.await();
            Assert.assertNull(atomicReference.get());
            LOG.info("Main thread: detected that logSync about to be called.");
            LOG.info("Trying to enter safe mode.");
            LOG.info("This should block for 10sec, since we have pending edits");
            long now = Time.now();
            loadFromDisk.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
            long now2 = Time.now();
            LOG.info("Entered safe mode");
            Assert.assertTrue(now2 - now > 9000);
            loadFromDisk.saveNamespace();
            LOG.info("Joining on edit thread...");
            thread.join();
            Assert.assertNull(atomicReference.get());
            Assert.assertEquals(3L, verifyEditLogs(loadFromDisk, fSImage, NNStorage.getFinalizedEditsFileName(1L, 3L), 1L));
            Assert.assertEquals(1L, verifyEditLogs(loadFromDisk, fSImage, NNStorage.getInProgressEditsFileName(4L), 4L));
            LOG.info("Closing nn");
            if (loadFromDisk != null) {
                loadFromDisk.close();
            }
        } catch (Throwable th) {
            LOG.info("Closing nn");
            if (loadFromDisk != null) {
                loadFromDisk.close();
            }
            throw th;
        }
    }

    static {
        ((Log4JLogger) FSEditLog.LOG).getLogger().setLevel(Level.ALL);
        LOG = LogFactory.getLog(TestEditLogRace.class);
        NAME_DIR = MiniDFSCluster.getBaseDirectory() + "name1";
    }
}
