package org.apache.flink.runtime.rest.handler.legacy;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.class */
public class ExecutionGraphCacheTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest$SuspendableAccessExecutionGraph.class */
    private static final class SuspendableAccessExecutionGraph extends ArchivedExecutionGraph {
        private static final long serialVersionUID = -6796543726305778101L;
        private JobStatus jobStatus;

        public SuspendableAccessExecutionGraph(JobID jobID) {
            super(jobID, "ExecutionGraphCacheTest", Collections.emptyMap(), Collections.emptyList(), new long[0], JobStatus.RUNNING, new ErrorInfo(new FlinkException("Test"), 42L), "", new StringifiedAccumulatorResult[0], Collections.emptyMap(), new ArchivedExecutionConfig(new ExecutionConfig()), false, (CheckpointCoordinatorConfiguration) null, (CheckpointStatsSnapshot) null);
            this.jobStatus = super.getState();
        }

        public JobStatus getState() {
            return this.jobStatus;
        }

        public void setJobStatus(JobStatus jobStatus) {
            this.jobStatus = jobStatus;
        }
    }

    @Test
    public void testExecutionGraphCaching() throws Exception {
        Time milliseconds = Time.milliseconds(100L);
        Time hours = Time.hours(1L);
        JobID jobID = new JobID();
        AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph) Mockito.mock(AccessExecutionGraph.class);
        JobManagerGateway jobManagerGateway = (JobManagerGateway) Mockito.mock(JobManagerGateway.class);
        Mockito.when(jobManagerGateway.requestJob((JobID) Matchers.eq(jobID), (Time) Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
        ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(milliseconds, hours);
        Throwable th = null;
        try {
            Assert.assertEquals(accessExecutionGraph, executionGraphCache.getExecutionGraph(jobID, jobManagerGateway).get());
            Assert.assertEquals(accessExecutionGraph, executionGraphCache.getExecutionGraph(jobID, jobManagerGateway).get());
            ((JobManagerGateway) Mockito.verify(jobManagerGateway, Mockito.times(1))).requestJob((JobID) Matchers.eq(jobID), (Time) Matchers.any(Time.class));
            if (executionGraphCache != null) {
                if (0 == 0) {
                    executionGraphCache.close();
                    return;
                }
                try {
                    executionGraphCache.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (executionGraphCache != null) {
                if (0 != 0) {
                    try {
                        executionGraphCache.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    executionGraphCache.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testExecutionGraphEntryInvalidation() throws Exception {
        Time milliseconds = Time.milliseconds(100L);
        Time milliseconds2 = Time.milliseconds(1L);
        JobID jobID = new JobID();
        AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph) Mockito.mock(AccessExecutionGraph.class);
        JobManagerGateway jobManagerGateway = (JobManagerGateway) Mockito.mock(JobManagerGateway.class);
        Mockito.when(jobManagerGateway.requestJob((JobID) Matchers.eq(jobID), (Time) Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
        ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(milliseconds, milliseconds2);
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(accessExecutionGraph, executionGraphCache.getExecutionGraph(jobID, jobManagerGateway).get());
                Thread.sleep(milliseconds2.toMilliseconds());
                Assert.assertEquals(accessExecutionGraph, executionGraphCache.getExecutionGraph(jobID, jobManagerGateway).get());
                ((JobManagerGateway) Mockito.verify(jobManagerGateway, Mockito.times(2))).requestJob((JobID) Matchers.eq(jobID), (Time) Matchers.any(Time.class));
                if (executionGraphCache != null) {
                    if (0 == 0) {
                        executionGraphCache.close();
                        return;
                    }
                    try {
                        executionGraphCache.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executionGraphCache != null) {
                if (th != null) {
                    try {
                        executionGraphCache.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executionGraphCache.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testImmediateCacheInvalidationAfterFailure() throws Exception {
        Time milliseconds = Time.milliseconds(100L);
        Time hours = Time.hours(1L);
        JobID jobID = new JobID();
        AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph) Mockito.mock(AccessExecutionGraph.class);
        JobManagerGateway jobManagerGateway = (JobManagerGateway) Mockito.mock(JobManagerGateway.class);
        Mockito.when(jobManagerGateway.requestJob((JobID) Matchers.eq(jobID), (Time) Matchers.any(Time.class))).thenReturn(FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID)), new CompletableFuture[]{CompletableFuture.completedFuture(accessExecutionGraph)});
        ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(milliseconds, hours);
        Throwable th = null;
        try {
            try {
                try {
                    executionGraphCache.getExecutionGraph(jobID, jobManagerGateway).get();
                    Assert.fail("The execution graph future should have been completed exceptionally.");
                } catch (ExecutionException e) {
                    Assert.assertTrue(e.getCause() instanceof FlinkException);
                }
                Assert.assertEquals(accessExecutionGraph, executionGraphCache.getExecutionGraph(jobID, jobManagerGateway).get());
                if (executionGraphCache != null) {
                    if (0 == 0) {
                        executionGraphCache.close();
                        return;
                    }
                    try {
                        executionGraphCache.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executionGraphCache != null) {
                if (th != null) {
                    try {
                        executionGraphCache.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executionGraphCache.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCacheEntryCleanup() throws Exception {
        Time milliseconds = Time.milliseconds(100L);
        Time milliseconds2 = Time.milliseconds(1L);
        JobID jobID = new JobID();
        JobID jobID2 = new JobID();
        AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph) Mockito.mock(AccessExecutionGraph.class);
        AccessExecutionGraph accessExecutionGraph2 = (AccessExecutionGraph) Mockito.mock(AccessExecutionGraph.class);
        JobManagerGateway jobManagerGateway = (JobManagerGateway) Mockito.mock(JobManagerGateway.class);
        Mockito.when(jobManagerGateway.requestJob((JobID) Matchers.eq(jobID), (Time) Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
        Mockito.when(jobManagerGateway.requestJob((JobID) Matchers.eq(jobID2), (Time) Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph2));
        ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(milliseconds, milliseconds2);
        Throwable th = null;
        try {
            try {
                CompletableFuture executionGraph = executionGraphCache.getExecutionGraph(jobID, jobManagerGateway);
                CompletableFuture executionGraph2 = executionGraphCache.getExecutionGraph(jobID2, jobManagerGateway);
                Assert.assertEquals(accessExecutionGraph, executionGraph.get());
                Assert.assertEquals(accessExecutionGraph2, executionGraph2.get());
                ((JobManagerGateway) Mockito.verify(jobManagerGateway, Mockito.times(1))).requestJob((JobID) Matchers.eq(jobID), (Time) Matchers.any(Time.class));
                ((JobManagerGateway) Mockito.verify(jobManagerGateway, Mockito.times(1))).requestJob((JobID) Matchers.eq(jobID2), (Time) Matchers.any(Time.class));
                Thread.sleep(milliseconds2.toMilliseconds());
                executionGraphCache.cleanup();
                Assert.assertTrue(executionGraphCache.size() == 0);
                if (executionGraphCache != null) {
                    if (0 == 0) {
                        executionGraphCache.close();
                        return;
                    }
                    try {
                        executionGraphCache.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executionGraphCache != null) {
                if (th != null) {
                    try {
                        executionGraphCache.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executionGraphCache.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testConcurrentAccess() throws Exception {
        Time milliseconds = Time.milliseconds(100L);
        Time hours = Time.hours(1L);
        JobID jobID = new JobID();
        AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph) Mockito.mock(AccessExecutionGraph.class);
        JobManagerGateway jobManagerGateway = (JobManagerGateway) Mockito.mock(JobManagerGateway.class);
        Mockito.when(jobManagerGateway.requestJob((JobID) Matchers.eq(jobID), (Time) Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph));
        ArrayList arrayList = new ArrayList(10);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        try {
            ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(milliseconds, hours);
            Throwable th = null;
            for (int i = 0; i < 10; i++) {
                try {
                    try {
                        arrayList.add(CompletableFuture.supplyAsync(() -> {
                            return executionGraphCache.getExecutionGraph(jobID, jobManagerGateway);
                        }, newFixedThreadPool).thenCompose(Function.identity()));
                    } finally {
                    }
                } finally {
                }
            }
            Iterator it = ((Collection) FutureUtils.combineAll(arrayList).get()).iterator();
            while (it.hasNext()) {
                Assert.assertEquals(accessExecutionGraph, (AccessExecutionGraph) it.next());
            }
            ((JobManagerGateway) Mockito.verify(jobManagerGateway, Mockito.times(1))).requestJob((JobID) Matchers.eq(jobID), (Time) Matchers.any(Time.class));
            if (executionGraphCache != null) {
                if (0 != 0) {
                    try {
                        executionGraphCache.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    executionGraphCache.close();
                }
            }
            org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(5000L, TimeUnit.MILLISECONDS, new ExecutorService[]{newFixedThreadPool});
        } catch (Throwable th3) {
            org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(5000L, TimeUnit.MILLISECONDS, new ExecutorService[]{newFixedThreadPool});
            throw th3;
        }
    }

    @Test
    public void testCacheInvalidationIfSuspended() throws Exception {
        Time milliseconds = Time.milliseconds(100L);
        Time hours = Time.hours(1L);
        JobID jobID = new JobID();
        AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph) Mockito.mock(AccessExecutionGraph.class);
        AccessExecutionGraph accessExecutionGraph2 = (AccessExecutionGraph) Mockito.mock(AccessExecutionGraph.class);
        Mockito.when(accessExecutionGraph2.getState()).thenReturn(JobStatus.SUSPENDED);
        JobManagerGateway jobManagerGateway = (JobManagerGateway) Mockito.mock(JobManagerGateway.class);
        Mockito.when(jobManagerGateway.requestJob((JobID) Matchers.eq(jobID), (Time) Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph2), new CompletableFuture[]{CompletableFuture.completedFuture(accessExecutionGraph)});
        ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(milliseconds, hours);
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(accessExecutionGraph2, executionGraphCache.getExecutionGraph(jobID, jobManagerGateway).get());
                Assert.assertEquals(accessExecutionGraph, executionGraphCache.getExecutionGraph(jobID, jobManagerGateway).get());
                if (executionGraphCache != null) {
                    if (0 == 0) {
                        executionGraphCache.close();
                        return;
                    }
                    try {
                        executionGraphCache.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executionGraphCache != null) {
                if (th != null) {
                    try {
                        executionGraphCache.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executionGraphCache.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCacheInvalidationIfSwitchToSuspended() throws Exception {
        Time milliseconds = Time.milliseconds(100L);
        Time hours = Time.hours(1L);
        JobID jobID = new JobID();
        AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph) Mockito.mock(AccessExecutionGraph.class);
        SuspendableAccessExecutionGraph suspendableAccessExecutionGraph = new SuspendableAccessExecutionGraph(jobID);
        JobManagerGateway jobManagerGateway = (JobManagerGateway) Mockito.mock(JobManagerGateway.class);
        Mockito.when(jobManagerGateway.requestJob((JobID) Matchers.eq(jobID), (Time) Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(suspendableAccessExecutionGraph), new CompletableFuture[]{CompletableFuture.completedFuture(accessExecutionGraph)});
        ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(milliseconds, hours);
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(suspendableAccessExecutionGraph, executionGraphCache.getExecutionGraph(jobID, jobManagerGateway).get());
                suspendableAccessExecutionGraph.setJobStatus(JobStatus.SUSPENDED);
                Assert.assertEquals(accessExecutionGraph, executionGraphCache.getExecutionGraph(jobID, jobManagerGateway).get());
                Assert.assertEquals(accessExecutionGraph, executionGraphCache.getExecutionGraph(jobID, jobManagerGateway).get());
                ((JobManagerGateway) Mockito.verify(jobManagerGateway, Mockito.times(2))).requestJob((JobID) Matchers.eq(jobID), (Time) Matchers.any(Time.class));
                if (executionGraphCache != null) {
                    if (0 == 0) {
                        executionGraphCache.close();
                        return;
                    }
                    try {
                        executionGraphCache.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executionGraphCache != null) {
                if (th != null) {
                    try {
                        executionGraphCache.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executionGraphCache.close();
                }
            }
            throw th4;
        }
    }
}
