package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.class */
public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
    private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
    private static final RetrievableStateStorageHelper<SubmittedJobGraph> localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() { // from class: org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphsStoreITCase.1
        public RetrievableStateHandle<SubmittedJobGraph> store(SubmittedJobGraph submittedJobGraph) throws IOException {
            return new RetrievableStreamStateHandle(new TestByteStreamStateHandleDeepCompare(String.valueOf(UUID.randomUUID()), InstantiationUtil.serializeObject(submittedJobGraph)));
        }
    };

    @AfterClass
    public static void tearDown() throws Exception {
        if (ZooKeeper != null) {
            ZooKeeper.shutdown();
        }
    }

    @Before
    public void cleanUp() throws Exception {
        ZooKeeper.deleteAll();
    }

    @Test
    public void testPutAndRemoveJobGraph() throws Exception {
        ZooKeeperSubmittedJobGraphStore zooKeeperSubmittedJobGraphStore = new ZooKeeperSubmittedJobGraphStore(ZooKeeper.createClient(), "/testPutAndRemoveJobGraph", localStateStorage, Executors.directExecutor());
        try {
            SubmittedJobGraphStore.SubmittedJobGraphListener submittedJobGraphListener = (SubmittedJobGraphStore.SubmittedJobGraphListener) Mockito.mock(SubmittedJobGraphStore.SubmittedJobGraphListener.class);
            zooKeeperSubmittedJobGraphStore.start(submittedJobGraphListener);
            SubmittedJobGraph createSubmittedJobGraph = createSubmittedJobGraph(new JobID(), 0L);
            Assert.assertEquals(0L, zooKeeperSubmittedJobGraphStore.getJobIds().size());
            zooKeeperSubmittedJobGraphStore.putJobGraph(createSubmittedJobGraph);
            Collection jobIds = zooKeeperSubmittedJobGraphStore.getJobIds();
            Assert.assertEquals(1L, jobIds.size());
            verifyJobGraphs(createSubmittedJobGraph, zooKeeperSubmittedJobGraphStore.recoverJobGraph((JobID) jobIds.iterator().next()));
            SubmittedJobGraph createSubmittedJobGraph2 = createSubmittedJobGraph(createSubmittedJobGraph.getJobId(), 1L);
            zooKeeperSubmittedJobGraphStore.putJobGraph(createSubmittedJobGraph2);
            Collection jobIds2 = zooKeeperSubmittedJobGraphStore.getJobIds();
            Assert.assertEquals(1L, jobIds2.size());
            verifyJobGraphs(createSubmittedJobGraph2, zooKeeperSubmittedJobGraphStore.recoverJobGraph((JobID) jobIds2.iterator().next()));
            zooKeeperSubmittedJobGraphStore.removeJobGraph(createSubmittedJobGraph2.getJobId());
            Assert.assertEquals(0L, zooKeeperSubmittedJobGraphStore.getJobIds().size());
            ((SubmittedJobGraphStore.SubmittedJobGraphListener) Mockito.verify(submittedJobGraphListener, Mockito.atMost(1))).onAddedJobGraph((JobID) Matchers.any(JobID.class));
            ((SubmittedJobGraphStore.SubmittedJobGraphListener) Mockito.verify(submittedJobGraphListener, Mockito.never())).onRemovedJobGraph((JobID) Matchers.any(JobID.class));
            zooKeeperSubmittedJobGraphStore.removeJobGraph(createSubmittedJobGraph2.getJobId());
            zooKeeperSubmittedJobGraphStore.stop();
        } catch (Throwable th) {
            zooKeeperSubmittedJobGraphStore.stop();
            throw th;
        }
    }

    @Test
    public void testRecoverJobGraphs() throws Exception {
        ZooKeeperSubmittedJobGraphStore zooKeeperSubmittedJobGraphStore = new ZooKeeperSubmittedJobGraphStore(ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage, Executors.directExecutor());
        try {
            SubmittedJobGraphStore.SubmittedJobGraphListener submittedJobGraphListener = (SubmittedJobGraphStore.SubmittedJobGraphListener) Mockito.mock(SubmittedJobGraphStore.SubmittedJobGraphListener.class);
            zooKeeperSubmittedJobGraphStore.start(submittedJobGraphListener);
            HashMap hashMap = new HashMap();
            JobID[] jobIDArr = {new JobID(), new JobID(), new JobID()};
            hashMap.put(jobIDArr[0], createSubmittedJobGraph(jobIDArr[0], 0L));
            hashMap.put(jobIDArr[1], createSubmittedJobGraph(jobIDArr[1], 1L));
            hashMap.put(jobIDArr[2], createSubmittedJobGraph(jobIDArr[2], 2L));
            Iterator it = hashMap.values().iterator();
            while (it.hasNext()) {
                zooKeeperSubmittedJobGraphStore.putJobGraph((SubmittedJobGraph) it.next());
            }
            Collection jobIds = zooKeeperSubmittedJobGraphStore.getJobIds();
            Assert.assertEquals(hashMap.size(), jobIds.size());
            Iterator it2 = jobIds.iterator();
            while (it2.hasNext()) {
                SubmittedJobGraph recoverJobGraph = zooKeeperSubmittedJobGraphStore.recoverJobGraph((JobID) it2.next());
                Assert.assertTrue(hashMap.containsKey(recoverJobGraph.getJobId()));
                verifyJobGraphs((SubmittedJobGraph) hashMap.get(recoverJobGraph.getJobId()), recoverJobGraph);
                zooKeeperSubmittedJobGraphStore.removeJobGraph(recoverJobGraph.getJobId());
            }
            Assert.assertEquals(0L, zooKeeperSubmittedJobGraphStore.getJobIds().size());
            ((SubmittedJobGraphStore.SubmittedJobGraphListener) Mockito.verify(submittedJobGraphListener, Mockito.atMost(hashMap.size()))).onAddedJobGraph((JobID) Matchers.any(JobID.class));
            ((SubmittedJobGraphStore.SubmittedJobGraphListener) Mockito.verify(submittedJobGraphListener, Mockito.never())).onRemovedJobGraph((JobID) Matchers.any(JobID.class));
            zooKeeperSubmittedJobGraphStore.stop();
        } catch (Throwable th) {
            zooKeeperSubmittedJobGraphStore.stop();
            throw th;
        }
    }

    @Test
    public void testConcurrentAddJobGraph() throws Exception {
        ZooKeeperSubmittedJobGraphStore zooKeeperSubmittedJobGraphStore = null;
        ZooKeeperSubmittedJobGraphStore zooKeeperSubmittedJobGraphStore2 = null;
        try {
            zooKeeperSubmittedJobGraphStore = new ZooKeeperSubmittedJobGraphStore(ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
            zooKeeperSubmittedJobGraphStore2 = new ZooKeeperSubmittedJobGraphStore(ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
            SubmittedJobGraph createSubmittedJobGraph = createSubmittedJobGraph(new JobID(), 0L);
            SubmittedJobGraph createSubmittedJobGraph2 = createSubmittedJobGraph(new JobID(), 0L);
            SubmittedJobGraphStore.SubmittedJobGraphListener submittedJobGraphListener = (SubmittedJobGraphStore.SubmittedJobGraphListener) Mockito.mock(SubmittedJobGraphStore.SubmittedJobGraphListener.class);
            final JobID[] jobIDArr = new JobID[1];
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            ((SubmittedJobGraphStore.SubmittedJobGraphListener) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphsStoreITCase.2
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m223answer(InvocationOnMock invocationOnMock) throws Throwable {
                    jobIDArr[0] = (JobID) invocationOnMock.getArguments()[0];
                    countDownLatch.countDown();
                    return null;
                }
            }).when(submittedJobGraphListener)).onAddedJobGraph((JobID) Matchers.any(JobID.class));
            zooKeeperSubmittedJobGraphStore.start(submittedJobGraphListener);
            zooKeeperSubmittedJobGraphStore2.start((SubmittedJobGraphStore.SubmittedJobGraphListener) null);
            zooKeeperSubmittedJobGraphStore.putJobGraph(createSubmittedJobGraph);
            ((SubmittedJobGraphStore.SubmittedJobGraphListener) Mockito.verify(submittedJobGraphListener, Mockito.never())).onAddedJobGraph((JobID) Matchers.any(JobID.class));
            ((SubmittedJobGraphStore.SubmittedJobGraphListener) Mockito.verify(submittedJobGraphListener, Mockito.never())).onRemovedJobGraph((JobID) Matchers.any(JobID.class));
            zooKeeperSubmittedJobGraphStore2.putJobGraph(createSubmittedJobGraph2);
            countDownLatch.await();
            ((SubmittedJobGraphStore.SubmittedJobGraphListener) Mockito.verify(submittedJobGraphListener, Mockito.times(1))).onAddedJobGraph((JobID) Matchers.any(JobID.class));
            ((SubmittedJobGraphStore.SubmittedJobGraphListener) Mockito.verify(submittedJobGraphListener, Mockito.never())).onRemovedJobGraph((JobID) Matchers.any(JobID.class));
            Assert.assertEquals(createSubmittedJobGraph2.getJobId(), jobIDArr[0]);
            if (zooKeeperSubmittedJobGraphStore != null) {
                zooKeeperSubmittedJobGraphStore.stop();
            }
            if (zooKeeperSubmittedJobGraphStore2 != null) {
                zooKeeperSubmittedJobGraphStore2.stop();
            }
        } catch (Throwable th) {
            if (zooKeeperSubmittedJobGraphStore != null) {
                zooKeeperSubmittedJobGraphStore.stop();
            }
            if (zooKeeperSubmittedJobGraphStore2 != null) {
                zooKeeperSubmittedJobGraphStore2.stop();
            }
            throw th;
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
        ZooKeeperSubmittedJobGraphStore zooKeeperSubmittedJobGraphStore = new ZooKeeperSubmittedJobGraphStore(ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor());
        ZooKeeperSubmittedJobGraphStore zooKeeperSubmittedJobGraphStore2 = new ZooKeeperSubmittedJobGraphStore(ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, Executors.directExecutor());
        zooKeeperSubmittedJobGraphStore.start((SubmittedJobGraphStore.SubmittedJobGraphListener) null);
        zooKeeperSubmittedJobGraphStore2.start((SubmittedJobGraphStore.SubmittedJobGraphListener) null);
        SubmittedJobGraph createSubmittedJobGraph = createSubmittedJobGraph(new JobID(), 0L);
        zooKeeperSubmittedJobGraphStore.putJobGraph(createSubmittedJobGraph);
        zooKeeperSubmittedJobGraphStore2.putJobGraph(createSubmittedJobGraph);
    }

    private SubmittedJobGraph createSubmittedJobGraph(JobID jobID, long j) {
        JobGraph jobGraph = new JobGraph(jobID, "Test JobGraph");
        JobVertex jobVertex = new JobVertex("Test JobVertex");
        jobVertex.setParallelism(1);
        jobGraph.addVertex(jobVertex);
        return new SubmittedJobGraph(jobGraph, new JobInfo(ActorRef.noSender(), ListeningBehaviour.DETACHED, j, 2147483647L));
    }

    protected void verifyJobGraphs(SubmittedJobGraph submittedJobGraph, SubmittedJobGraph submittedJobGraph2) throws Exception {
        JobGraph jobGraph = submittedJobGraph.getJobGraph();
        JobGraph jobGraph2 = submittedJobGraph2.getJobGraph();
        Assert.assertEquals(jobGraph.getName(), jobGraph2.getName());
        Assert.assertEquals(jobGraph.getJobID(), jobGraph2.getJobID());
        Assert.assertEquals(submittedJobGraph.getJobInfo(), submittedJobGraph2.getJobInfo());
    }
}
