package org.apache.flink.runtime.jobmanager;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.dispatcher.NoOpJobGraphListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.class */
public class ZooKeeperJobGraphsStoreITCase extends TestLogger {
    private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
    private static final RetrievableStateStorageHelper<JobGraph> localStateStorage = jobGraph -> {
        return new RetrievableStreamStateHandle(new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), InstantiationUtil.serializeObject(jobGraph)));
    };

    @AfterClass
    public static void tearDown() throws Exception {
        ZooKeeper.shutdown();
    }

    @Before
    public void cleanUp() throws Exception {
        ZooKeeper.deleteAll();
    }

    @Test
    public void testPutAndRemoveJobGraph() throws Exception {
        JobGraphStore createZooKeeperJobGraphStore = createZooKeeperJobGraphStore("/testPutAndRemoveJobGraph");
        try {
            JobGraphStore.JobGraphListener jobGraphListener = (JobGraphStore.JobGraphListener) Mockito.mock(JobGraphStore.JobGraphListener.class);
            createZooKeeperJobGraphStore.start(jobGraphListener);
            JobGraph createJobGraph = createJobGraph(new JobID(), "JobName");
            Assert.assertEquals(0L, createZooKeeperJobGraphStore.getJobIds().size());
            createZooKeeperJobGraphStore.putJobGraph(createJobGraph);
            Collection jobIds = createZooKeeperJobGraphStore.getJobIds();
            Assert.assertEquals(1L, jobIds.size());
            verifyJobGraphs(createJobGraph, createZooKeeperJobGraphStore.recoverJobGraph((JobID) jobIds.iterator().next()));
            JobGraph createJobGraph2 = createJobGraph(createJobGraph.getJobID(), "Updated JobName");
            createZooKeeperJobGraphStore.putJobGraph(createJobGraph2);
            Collection jobIds2 = createZooKeeperJobGraphStore.getJobIds();
            Assert.assertEquals(1L, jobIds2.size());
            verifyJobGraphs(createJobGraph2, createZooKeeperJobGraphStore.recoverJobGraph((JobID) jobIds2.iterator().next()));
            createZooKeeperJobGraphStore.removeJobGraph(createJobGraph2.getJobID());
            Assert.assertEquals(0L, createZooKeeperJobGraphStore.getJobIds().size());
            ((JobGraphStore.JobGraphListener) Mockito.verify(jobGraphListener, Mockito.atMost(1))).onAddedJobGraph((JobID) Matchers.any(JobID.class));
            ((JobGraphStore.JobGraphListener) Mockito.verify(jobGraphListener, Mockito.never())).onRemovedJobGraph((JobID) Matchers.any(JobID.class));
            createZooKeeperJobGraphStore.removeJobGraph(createJobGraph2.getJobID());
            createZooKeeperJobGraphStore.stop();
        } catch (Throwable th) {
            createZooKeeperJobGraphStore.stop();
            throw th;
        }
    }

    @Nonnull
    private JobGraphStore createZooKeeperJobGraphStore(String str) throws Exception {
        CuratorFramework client = ZooKeeper.getClient();
        client.newNamespaceAwareEnsurePath(str).ensure(client.getZookeeperClient());
        CuratorFramework usingNamespace = client.usingNamespace(client.getNamespace() + str);
        return new DefaultJobGraphStore(new ZooKeeperStateHandleStore(usingNamespace, localStateStorage), new ZooKeeperJobGraphStoreWatcher(new PathChildrenCache(usingNamespace, "/", false)), ZooKeeperJobGraphStoreUtil.INSTANCE);
    }

    @Test
    public void testRecoverJobGraphs() throws Exception {
        JobGraphStore createZooKeeperJobGraphStore = createZooKeeperJobGraphStore("/testRecoverJobGraphs");
        try {
            JobGraphStore.JobGraphListener jobGraphListener = (JobGraphStore.JobGraphListener) Mockito.mock(JobGraphStore.JobGraphListener.class);
            createZooKeeperJobGraphStore.start(jobGraphListener);
            HashMap hashMap = new HashMap();
            JobID[] jobIDArr = {new JobID(), new JobID(), new JobID()};
            hashMap.put(jobIDArr[0], createJobGraph(jobIDArr[0]));
            hashMap.put(jobIDArr[1], createJobGraph(jobIDArr[1]));
            hashMap.put(jobIDArr[2], createJobGraph(jobIDArr[2]));
            Iterator it = hashMap.values().iterator();
            while (it.hasNext()) {
                createZooKeeperJobGraphStore.putJobGraph((JobGraph) it.next());
            }
            Collection jobIds = createZooKeeperJobGraphStore.getJobIds();
            Assert.assertEquals(hashMap.size(), jobIds.size());
            Iterator it2 = jobIds.iterator();
            while (it2.hasNext()) {
                JobGraph recoverJobGraph = createZooKeeperJobGraphStore.recoverJobGraph((JobID) it2.next());
                Assert.assertTrue(hashMap.containsKey(recoverJobGraph.getJobID()));
                verifyJobGraphs((JobGraph) hashMap.get(recoverJobGraph.getJobID()), recoverJobGraph);
                createZooKeeperJobGraphStore.removeJobGraph(recoverJobGraph.getJobID());
            }
            Assert.assertEquals(0L, createZooKeeperJobGraphStore.getJobIds().size());
            ((JobGraphStore.JobGraphListener) Mockito.verify(jobGraphListener, Mockito.atMost(hashMap.size()))).onAddedJobGraph((JobID) Matchers.any(JobID.class));
            ((JobGraphStore.JobGraphListener) Mockito.verify(jobGraphListener, Mockito.never())).onRemovedJobGraph((JobID) Matchers.any(JobID.class));
            createZooKeeperJobGraphStore.stop();
        } catch (Throwable th) {
            createZooKeeperJobGraphStore.stop();
            throw th;
        }
    }

    @Test
    public void testConcurrentAddJobGraph() throws Exception {
        JobGraphStore jobGraphStore = null;
        JobGraphStore jobGraphStore2 = null;
        try {
            jobGraphStore = createZooKeeperJobGraphStore("/testConcurrentAddJobGraph");
            jobGraphStore2 = createZooKeeperJobGraphStore("/testConcurrentAddJobGraph");
            JobGraph createJobGraph = createJobGraph(new JobID());
            JobGraph createJobGraph2 = createJobGraph(new JobID());
            JobGraphStore.JobGraphListener jobGraphListener = (JobGraphStore.JobGraphListener) Mockito.mock(JobGraphStore.JobGraphListener.class);
            final JobID[] jobIDArr = new JobID[1];
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            ((JobGraphStore.JobGraphListener) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphsStoreITCase.1
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m180answer(InvocationOnMock invocationOnMock) throws Throwable {
                    jobIDArr[0] = (JobID) invocationOnMock.getArguments()[0];
                    countDownLatch.countDown();
                    return null;
                }
            }).when(jobGraphListener)).onAddedJobGraph((JobID) Matchers.any(JobID.class));
            jobGraphStore.start(jobGraphListener);
            jobGraphStore2.start(NoOpJobGraphListener.INSTANCE);
            jobGraphStore.putJobGraph(createJobGraph);
            ((JobGraphStore.JobGraphListener) Mockito.verify(jobGraphListener, Mockito.never())).onAddedJobGraph((JobID) Matchers.any(JobID.class));
            ((JobGraphStore.JobGraphListener) Mockito.verify(jobGraphListener, Mockito.never())).onRemovedJobGraph((JobID) Matchers.any(JobID.class));
            jobGraphStore2.putJobGraph(createJobGraph2);
            countDownLatch.await();
            ((JobGraphStore.JobGraphListener) Mockito.verify(jobGraphListener, Mockito.times(1))).onAddedJobGraph((JobID) Matchers.any(JobID.class));
            ((JobGraphStore.JobGraphListener) Mockito.verify(jobGraphListener, Mockito.never())).onRemovedJobGraph((JobID) Matchers.any(JobID.class));
            Assert.assertEquals(createJobGraph2.getJobID(), jobIDArr[0]);
            if (jobGraphStore != null) {
                jobGraphStore.stop();
            }
            if (jobGraphStore2 != null) {
                jobGraphStore2.stop();
            }
        } catch (Throwable th) {
            if (jobGraphStore != null) {
                jobGraphStore.stop();
            }
            if (jobGraphStore2 != null) {
                jobGraphStore2.stop();
            }
            throw th;
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
        JobGraphStore createZooKeeperJobGraphStore = createZooKeeperJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
        JobGraphStore createZooKeeperJobGraphStore2 = createZooKeeperJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
        createZooKeeperJobGraphStore.start(NoOpJobGraphListener.INSTANCE);
        createZooKeeperJobGraphStore2.start(NoOpJobGraphListener.INSTANCE);
        JobGraph createJobGraph = createJobGraph(new JobID());
        createZooKeeperJobGraphStore.putJobGraph(createJobGraph);
        createZooKeeperJobGraphStore2.putJobGraph(createJobGraph);
    }

    @Test
    public void testJobGraphRemovalFailureAndLockRelease() throws Exception {
        JobGraphStore createZooKeeperJobGraphStore = createZooKeeperJobGraphStore("/testConcurrentAddJobGraph");
        JobGraphStore createZooKeeperJobGraphStore2 = createZooKeeperJobGraphStore("/testConcurrentAddJobGraph");
        TestingJobGraphListener testingJobGraphListener = new TestingJobGraphListener();
        createZooKeeperJobGraphStore.start(testingJobGraphListener);
        createZooKeeperJobGraphStore2.start(testingJobGraphListener);
        JobGraph jobGraph = new JobGraph(new JobVertex[0]);
        createZooKeeperJobGraphStore.putJobGraph(jobGraph);
        JobGraph recoverJobGraph = createZooKeeperJobGraphStore2.recoverJobGraph(jobGraph.getJobID());
        Assert.assertThat(recoverJobGraph, org.hamcrest.Matchers.is(org.hamcrest.Matchers.notNullValue()));
        try {
            createZooKeeperJobGraphStore2.removeJobGraph(recoverJobGraph.getJobID());
            Assert.fail("It should not be possible to remove the JobGraph since the first store still has a lock on it.");
        } catch (Exception e) {
        }
        createZooKeeperJobGraphStore.stop();
        createZooKeeperJobGraphStore2.removeJobGraph(recoverJobGraph.getJobID());
        Assert.assertThat(createZooKeeperJobGraphStore2.recoverJobGraph(recoverJobGraph.getJobID()), org.hamcrest.Matchers.is(org.hamcrest.Matchers.nullValue()));
        createZooKeeperJobGraphStore2.stop();
    }

    private JobGraph createJobGraph(JobID jobID) {
        return createJobGraph(jobID, "Test JobGraph");
    }

    private JobGraph createJobGraph(JobID jobID, String str) {
        JobGraph jobGraph = new JobGraph(jobID, str);
        JobVertex jobVertex = new JobVertex("Test JobVertex");
        jobVertex.setParallelism(1);
        jobGraph.addVertex(jobVertex);
        return jobGraph;
    }

    private void verifyJobGraphs(JobGraph jobGraph, JobGraph jobGraph2) {
        Assert.assertEquals(jobGraph.getName(), jobGraph2.getName());
        Assert.assertEquals(jobGraph.getJobID(), jobGraph2.getJobID());
    }
}
