package org.apache.hadoop.yarn.server.resourcemanager.recovery;

import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.class */
public class TestRMStateStore {
    public static final Log LOG = LogFactory.getLog(TestRMStateStore.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore$RMStateStoreHelper.class */
    public interface RMStateStoreHelper {
        RMStateStore getRMStateStore() throws Exception;

        void addOrphanAttemptIfNeeded(RMStateStore rMStateStore, TestDispatcher testDispatcher) throws Exception;

        boolean isFinalStateValid() throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore$TestDispatcher.class */
    public class TestDispatcher implements Dispatcher, EventHandler<RMAppAttemptStoredEvent> {
        ApplicationAttemptId attemptId;
        Exception storedException;
        boolean notified = false;

        TestDispatcher() {
        }

        public void register(Class<? extends Enum> cls, EventHandler eventHandler) {
        }

        public void handle(RMAppAttemptStoredEvent rMAppAttemptStoredEvent) {
            Assert.assertEquals(this.attemptId, rMAppAttemptStoredEvent.getApplicationAttemptId());
            Assert.assertEquals(this.storedException, rMAppAttemptStoredEvent.getStoredException());
            this.notified = true;
            synchronized (this) {
                notifyAll();
            }
        }

        public EventHandler getEventHandler() {
            return this;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore$TestFSRMStateStoreTester.class */
    class TestFSRMStateStoreTester implements RMStateStoreHelper {
        Path workingDirPathURI;
        FileSystemRMStateStore store;
        MiniDFSCluster cluster;

        /* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore$TestFSRMStateStoreTester$TestFileSystemRMStore.class */
        class TestFileSystemRMStore extends FileSystemRMStateStore {
            TestFileSystemRMStore(Configuration configuration) throws Exception {
                init(configuration);
                Assert.assertTrue(TestFSRMStateStoreTester.this.workingDirPathURI.equals(this.fsWorkingPath));
            }
        }

        public TestFSRMStateStoreTester(MiniDFSCluster miniDFSCluster) throws Exception {
            Path path = new Path("/Test");
            this.cluster = miniDFSCluster;
            DistributedFileSystem fileSystem = miniDFSCluster.getFileSystem();
            fileSystem.mkdirs(path);
            this.workingDirPathURI = new Path(new Path(miniDFSCluster.getURI()), path);
            fileSystem.close();
        }

        @Override // org.apache.hadoop.yarn.server.resourcemanager.recovery.TestRMStateStore.RMStateStoreHelper
        public RMStateStore getRMStateStore() throws Exception {
            YarnConfiguration yarnConfiguration = new YarnConfiguration();
            yarnConfiguration.set("yarn.resourcemanager.fs.rm-state-store.uri", this.workingDirPathURI.toString());
            this.store = new TestFileSystemRMStore(yarnConfiguration);
            return this.store;
        }

        @Override // org.apache.hadoop.yarn.server.resourcemanager.recovery.TestRMStateStore.RMStateStoreHelper
        public void addOrphanAttemptIfNeeded(RMStateStore rMStateStore, TestDispatcher testDispatcher) throws Exception {
            TestRMStateStore.this.storeAttempt(rMStateStore, ConverterUtils.toApplicationAttemptId("appattempt_1352994193343_0003_000001"), "container_1352994193343_0003_01_000001", testDispatcher);
        }

        @Override // org.apache.hadoop.yarn.server.resourcemanager.recovery.TestRMStateStore.RMStateStoreHelper
        public boolean isFinalStateValid() throws Exception {
            return this.cluster.getFileSystem().listStatus(this.workingDirPathURI).length == 1;
        }
    }

    @Test
    public void testFSRMStateStore() throws Exception {
        MiniDFSCluster build = new MiniDFSCluster.Builder(new HdfsConfiguration()).numDataNodes(1).build();
        try {
            testRMStateStore(new TestFSRMStateStoreTester(build));
            build.shutdown();
        } catch (Throwable th) {
            build.shutdown();
            throw th;
        }
    }

    void waitNotify(TestDispatcher testDispatcher) {
        long currentTimeMillis = System.currentTimeMillis();
        while (!testDispatcher.notified) {
            synchronized (testDispatcher) {
                try {
                    testDispatcher.wait(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (System.currentTimeMillis() - currentTimeMillis > 60000) {
                Assert.fail("Timed out attempt store notification");
            }
        }
        testDispatcher.notified = false;
    }

    void storeApp(RMStateStore rMStateStore, ApplicationId applicationId, long j) throws Exception {
        ApplicationSubmissionContextPBImpl applicationSubmissionContextPBImpl = new ApplicationSubmissionContextPBImpl();
        applicationSubmissionContextPBImpl.setApplicationId(applicationId);
        RMApp rMApp = (RMApp) Mockito.mock(RMApp.class);
        Mockito.when(rMApp.getApplicationId()).thenReturn(applicationId);
        Mockito.when(Long.valueOf(rMApp.getSubmitTime())).thenReturn(Long.valueOf(j));
        Mockito.when(rMApp.getApplicationSubmissionContext()).thenReturn(applicationSubmissionContextPBImpl);
        rMStateStore.storeApplication(rMApp);
    }

    ContainerId storeAttempt(RMStateStore rMStateStore, ApplicationAttemptId applicationAttemptId, String str, TestDispatcher testDispatcher) throws Exception {
        ContainerPBImpl containerPBImpl = new ContainerPBImpl();
        containerPBImpl.setId(ConverterUtils.toContainerId(str));
        RMAppAttempt rMAppAttempt = (RMAppAttempt) Mockito.mock(RMAppAttempt.class);
        Mockito.when(rMAppAttempt.getAppAttemptId()).thenReturn(applicationAttemptId);
        Mockito.when(rMAppAttempt.getMasterContainer()).thenReturn(containerPBImpl);
        testDispatcher.attemptId = applicationAttemptId;
        testDispatcher.storedException = null;
        rMStateStore.storeApplicationAttempt(rMAppAttempt);
        waitNotify(testDispatcher);
        return containerPBImpl.getId();
    }

    void testRMStateStore(RMStateStoreHelper rMStateStoreHelper) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        RMStateStore rMStateStore = rMStateStoreHelper.getRMStateStore();
        TestDispatcher testDispatcher = new TestDispatcher();
        rMStateStore.setDispatcher(testDispatcher);
        ApplicationAttemptId applicationAttemptId = ConverterUtils.toApplicationAttemptId("appattempt_1352994193343_0001_000001");
        ApplicationId applicationId = applicationAttemptId.getApplicationId();
        storeApp(rMStateStore, applicationId, currentTimeMillis);
        ContainerId storeAttempt = storeAttempt(rMStateStore, applicationAttemptId, "container_1352994193343_0001_01_000001", testDispatcher);
        ApplicationAttemptId applicationAttemptId2 = ConverterUtils.toApplicationAttemptId("appattempt_1352994193343_0001_000002");
        ContainerId storeAttempt2 = storeAttempt(rMStateStore, applicationAttemptId2, "container_1352994193343_0001_02_000001", testDispatcher);
        ApplicationAttemptId applicationAttemptId3 = ConverterUtils.toApplicationAttemptId("appattempt_1352994193343_0002_000001");
        ApplicationId applicationId2 = applicationAttemptId3.getApplicationId();
        storeApp(rMStateStore, applicationId2, currentTimeMillis);
        storeAttempt(rMStateStore, applicationAttemptId3, "container_1352994193343_0002_01_000001", testDispatcher);
        RMApp rMApp = (RMApp) Mockito.mock(RMApp.class);
        HashMap hashMap = new HashMap();
        ApplicationSubmissionContextPBImpl applicationSubmissionContextPBImpl = new ApplicationSubmissionContextPBImpl();
        applicationSubmissionContextPBImpl.setApplicationId(applicationId2);
        Mockito.when(Long.valueOf(rMApp.getSubmitTime())).thenReturn(Long.valueOf(currentTimeMillis));
        Mockito.when(rMApp.getApplicationSubmissionContext()).thenReturn(applicationSubmissionContextPBImpl);
        Mockito.when(rMApp.getAppAttempts()).thenReturn(hashMap);
        RMAppAttempt rMAppAttempt = (RMAppAttempt) Mockito.mock(RMAppAttempt.class);
        Mockito.when(rMAppAttempt.getAppAttemptId()).thenReturn(applicationAttemptId3);
        hashMap.put(applicationAttemptId3, rMAppAttempt);
        rMStateStore.removeApplication(rMApp);
        rMStateStoreHelper.addOrphanAttemptIfNeeded(rMStateStore, testDispatcher);
        Thread.sleep(1000L);
        rMStateStore.close();
        RMStateStore rMStateStore2 = rMStateStoreHelper.getRMStateStore();
        Map applicationState = rMStateStore2.loadState().getApplicationState();
        Assert.assertEquals(1L, applicationState.size());
        RMStateStore.ApplicationState applicationState2 = (RMStateStore.ApplicationState) applicationState.get(applicationId);
        Assert.assertNotNull(applicationState2);
        Assert.assertEquals(currentTimeMillis, applicationState2.getSubmitTime());
        Assert.assertEquals(applicationId, applicationState2.getApplicationSubmissionContext().getApplicationId());
        RMStateStore.ApplicationAttemptState attempt = applicationState2.getAttempt(applicationAttemptId);
        Assert.assertNotNull(attempt);
        Assert.assertEquals(applicationAttemptId, attempt.getAttemptId());
        Assert.assertEquals(storeAttempt, attempt.getMasterContainer().getId());
        RMStateStore.ApplicationAttemptState attempt2 = applicationState2.getAttempt(applicationAttemptId2);
        Assert.assertNotNull(attempt2);
        Assert.assertEquals(applicationAttemptId2, attempt2.getAttemptId());
        Assert.assertEquals(storeAttempt2, attempt2.getMasterContainer().getId());
        Assert.assertTrue(rMStateStoreHelper.isFinalStateValid());
        rMStateStore2.close();
    }
}
