package org.apache.flink.test.runtime.leaderelection;

import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.class */
public class ZooKeeperLeaderElectionITCase extends TestLogger {
    private static final Duration TEST_TIMEOUT = Duration.ofMinutes(5);
    private static final Time RPC_TIMEOUT = Time.minutes(1);
    private static TestingServer zkServer;

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase$BlockingOperator.class */
    public static class BlockingOperator extends AbstractInvokable {
        private static final Object lock = new Object();
        private static volatile boolean isBlocking = true;

        public BlockingOperator(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            synchronized (lock) {
                while (isBlocking) {
                    lock.wait();
                }
            }
        }

        public static void unblock() {
            synchronized (lock) {
                isBlocking = false;
                lock.notifyAll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase$NewLeaderRetriever.class */
    public static class NewLeaderRetriever extends LeaderRetriever {
        private final Object lock;

        @Nullable
        private Tuple2<String, UUID> lastAddress;
        private CompletableFuture<Tuple2<String, UUID>> newLeaderFuture;

        private NewLeaderRetriever() {
            this.lock = new Object();
            this.lastAddress = null;
            this.newLeaderFuture = new CompletableFuture<>();
        }

        CompletableFuture<Tuple2<String, UUID>> waitUntilNewLeader() {
            synchronized (this.lock) {
                if (!this.newLeaderFuture.isDone()) {
                    return this.newLeaderFuture.thenApply(tuple2 -> {
                        synchronized (this.lock) {
                            this.newLeaderFuture = new CompletableFuture<>();
                        }
                        return tuple2;
                    });
                }
                CompletableFuture<Tuple2<String, UUID>> completableFuture = this.newLeaderFuture;
                this.newLeaderFuture = new CompletableFuture<>();
                return completableFuture;
            }
        }

        protected void notifyNewLeaderAddress(CompletableFuture<Tuple2<String, UUID>> completableFuture) {
            completableFuture.whenComplete((tuple2, th) -> {
                synchronized (this.lock) {
                    if (th != null) {
                        this.newLeaderFuture.completeExceptionally(th);
                    } else if (!tuple2.equals(this.lastAddress)) {
                        this.lastAddress = tuple2;
                        if (this.newLeaderFuture.isDone()) {
                            this.newLeaderFuture = CompletableFuture.completedFuture(tuple2);
                        } else {
                            this.newLeaderFuture.complete(tuple2);
                        }
                    }
                }
            });
        }
    }

    @BeforeClass
    public static void setup() throws Exception {
        zkServer = new TestingServer(true);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (zkServer != null) {
            zkServer.close();
            zkServer = null;
        }
    }

    @Test
    public void testJobExecutionOnClusterWithLeaderChange() throws Exception {
        Configuration createZooKeeperHAConfig = ZooKeeperTestUtils.createZooKeeperHAConfig(zkServer.getConnectString(), this.tempFolder.newFolder().getAbsolutePath());
        createZooKeeperHAConfig.setLong(ClusterOptions.REFUSED_REGISTRATION_DELAY, 50L);
        LeaderRetrievalService leaderRetrievalService = null;
        try {
            TestingMiniCluster testingMiniCluster = new TestingMiniCluster(new TestingMiniClusterConfiguration.Builder().setConfiguration(createZooKeeperHAConfig).setNumberDispatcherResourceManagerComponents(3).setNumTaskManagers(2).setNumSlotsPerTaskManager(2).build());
            Throwable th = null;
            try {
                try {
                    Deadline fromNow = Deadline.fromNow(TEST_TIMEOUT);
                    testingMiniCluster.start();
                    JobGraph createJobGraph = createJobGraph(4);
                    testingMiniCluster.submitJob(createJobGraph).get();
                    Collection<DispatcherResourceManagerComponent<?>> dispatcherResourceManagerComponents = testingMiniCluster.getDispatcherResourceManagerComponents();
                    NewLeaderRetriever newLeaderRetriever = new NewLeaderRetriever();
                    leaderRetrievalService = testingMiniCluster.getHighAvailabilityServices().getDispatcherLeaderRetriever();
                    leaderRetrievalService.start(newLeaderRetriever);
                    for (int i = 0; i < 2; i++) {
                        DispatcherResourceManagerComponent<?> leadingDispatcherResourceManagerComponent = getLeadingDispatcherResourceManagerComponent(dispatcherResourceManagerComponents, newLeaderRetriever);
                        Dispatcher dispatcher = leadingDispatcherResourceManagerComponent.getDispatcher();
                        CommonTestUtils.waitUntilCondition(() -> {
                            return Boolean.valueOf(dispatcher.requestJobStatus(createJobGraph.getJobID(), RPC_TIMEOUT).get() == JobStatus.RUNNING);
                        }, fromNow, 50L);
                        leadingDispatcherResourceManagerComponent.closeAsync();
                    }
                    CompletableFuture requestJobResult = getLeadingDispatcherResourceManagerComponent(dispatcherResourceManagerComponents, newLeaderRetriever).getDispatcher().requestJobResult(createJobGraph.getJobID(), RPC_TIMEOUT);
                    BlockingOperator.unblock();
                    Assert.assertThat(Boolean.valueOf(((JobResult) requestJobResult.get()).isSuccess()), Matchers.is(true));
                    if (testingMiniCluster != null) {
                        if (0 != 0) {
                            try {
                                testingMiniCluster.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            testingMiniCluster.close();
                        }
                    }
                    if (leaderRetrievalService != null) {
                        leaderRetrievalService.stop();
                    }
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            throw th4;
        }
    }

    @Nonnull
    protected DispatcherResourceManagerComponent<?> getLeadingDispatcherResourceManagerComponent(Collection<DispatcherResourceManagerComponent<?>> collection, NewLeaderRetriever newLeaderRetriever) throws Exception {
        String str = (String) newLeaderRetriever.waitUntilNewLeader().get().f0;
        return findLeadingDispatcherResourceManagerComponent(collection, str).orElseThrow(() -> {
            return new Exception(String.format("Could not find the leading Dispatcher with address %s", str));
        });
    }

    @Nonnull
    private static Optional<DispatcherResourceManagerComponent<?>> findLeadingDispatcherResourceManagerComponent(Collection<DispatcherResourceManagerComponent<?>> collection, String str) {
        for (DispatcherResourceManagerComponent<?> dispatcherResourceManagerComponent : collection) {
            if (dispatcherResourceManagerComponent.getDispatcher().getAddress().equals(str)) {
                return Optional.of(dispatcherResourceManagerComponent);
            }
        }
        return Optional.empty();
    }

    private JobGraph createJobGraph(int i) {
        boolean unused = BlockingOperator.isBlocking = true;
        JobVertex jobVertex = new JobVertex("blocking operator");
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(BlockingOperator.class);
        return new JobGraph("Blocking test job", new JobVertex[]{jobVertex});
    }
}
