package org.apache.flink.runtime.query;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService;
import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.class */
public class AkkaKvStateLocationLookupServiceTest extends TestLogger {
    private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
    private static ActorSystem testActorSystem;

    /* loaded from: input_file:org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest$LookupResponseActor.class */
    private static final class LookupResponseActor extends FlinkUntypedActor {
        private final Queue<KvStateMessage.LookupKvStateLocation> receivedLookups;
        private final Queue<Object> lookupResponses = new ArrayDeque();
        private UUID leaderSessionId;

        public LookupResponseActor(Queue<KvStateMessage.LookupKvStateLocation> queue, UUID uuid, Object... objArr) {
            this.receivedLookups = (Queue) Preconditions.checkNotNull(queue, "Received lookups");
            this.leaderSessionId = uuid;
            if (objArr != null) {
                for (Object obj : objArr) {
                    this.lookupResponses.add(obj);
                }
            }
        }

        public void handleMessage(Object obj) throws Exception {
            if (!(obj instanceof KvStateMessage.LookupKvStateLocation)) {
                if (obj instanceof UUID) {
                    this.leaderSessionId = (UUID) obj;
                    return;
                } else {
                    this.LOG.debug("Received unhandled message: {}", obj);
                    return;
                }
            }
            this.receivedLookups.add((KvStateMessage.LookupKvStateLocation) obj);
            Object poll = this.lookupResponses.poll();
            if (poll != null) {
                if (poll instanceof Throwable) {
                    sender().tell(new Status.Failure((Throwable) poll), self());
                } else {
                    sender().tell(new Status.Success(poll), self());
                }
            }
        }

        protected UUID getLeaderSessionID() {
            return this.leaderSessionId;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static ActorRef create(Queue<KvStateMessage.LookupKvStateLocation> queue, UUID uuid, Object... objArr) {
            return AkkaKvStateLocationLookupServiceTest.testActorSystem.actorOf(Props.create(LookupResponseActor.class, new Object[]{queue, uuid, objArr}));
        }
    }

    @BeforeClass
    public static void setUp() throws Exception {
        testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (testActorSystem != null) {
            testActorSystem.shutdown();
        }
    }

    @Test
    public void testNoJobManagerRegistered() throws Exception {
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        AkkaKvStateLocationLookupService akkaKvStateLocationLookupService = new AkkaKvStateLocationLookupService(testingLeaderRetrievalService, testActorSystem, TIMEOUT, new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
        akkaKvStateLocationLookupService.start();
        try {
            Await.result(akkaKvStateLocationLookupService.getKvStateLookupInfo(new JobID(), "coffee"), TIMEOUT);
            Assert.fail("Did not throw expected Exception");
        } catch (UnknownJobManager e) {
        }
        Assert.assertEquals("Received unexpected lookup", 0L, linkedBlockingQueue.size());
        UUID uuid = HighAvailabilityServices.DEFAULT_LEADER_ID;
        KvStateLocation kvStateLocation = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "tea");
        testingLeaderRetrievalService.notifyListener(AkkaUtils.getAkkaURL(testActorSystem, LookupResponseActor.create(linkedBlockingQueue, uuid, kvStateLocation)), uuid);
        JobID jobID = new JobID();
        Assert.assertEquals(kvStateLocation, (KvStateLocation) Await.result(akkaKvStateLocationLookupService.getKvStateLookupInfo(jobID, "tea"), TIMEOUT));
        Assert.assertEquals(1L, linkedBlockingQueue.size());
        verifyLookupMsg((KvStateMessage.LookupKvStateLocation) linkedBlockingQueue.poll(), jobID, "tea");
        testingLeaderRetrievalService.notifyListener(null, null);
        try {
            Await.result(akkaKvStateLocationLookupService.getKvStateLookupInfo(new JobID(), "coffee"), TIMEOUT);
            Assert.fail("Did not throw expected Exception");
        } catch (UnknownJobManager e2) {
        }
        Assert.assertEquals(0L, linkedBlockingQueue.size());
    }

    @Test
    public void testLeaderSessionIdChange() throws Exception {
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        AkkaKvStateLocationLookupService akkaKvStateLocationLookupService = new AkkaKvStateLocationLookupService(testingLeaderRetrievalService, testActorSystem, TIMEOUT, new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
        akkaKvStateLocationLookupService.start();
        KvStateLocation kvStateLocation = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "salt");
        UUID randomUUID = UUID.randomUUID();
        String akkaURL = AkkaUtils.getAkkaURL(testActorSystem, LookupResponseActor.create(linkedBlockingQueue, randomUUID, kvStateLocation));
        KvStateLocation kvStateLocation2 = new KvStateLocation(new JobID(), new JobVertexID(), 22321, "pepper");
        UUID randomUUID2 = UUID.randomUUID();
        String akkaURL2 = AkkaUtils.getAkkaURL(testActorSystem, LookupResponseActor.create(linkedBlockingQueue, randomUUID, kvStateLocation2));
        JobID jobID = new JobID();
        testingLeaderRetrievalService.notifyListener(akkaURL, randomUUID);
        Assert.assertEquals(kvStateLocation, (KvStateLocation) Await.result(akkaKvStateLocationLookupService.getKvStateLookupInfo(jobID, "rock"), TIMEOUT));
        Assert.assertEquals(1L, linkedBlockingQueue.size());
        verifyLookupMsg((KvStateMessage.LookupKvStateLocation) linkedBlockingQueue.poll(), jobID, "rock");
        testingLeaderRetrievalService.notifyListener(akkaURL2, randomUUID2);
        Assert.assertEquals(kvStateLocation2, (KvStateLocation) Await.result(akkaKvStateLocationLookupService.getKvStateLookupInfo(jobID, "roll"), TIMEOUT));
        Assert.assertEquals(1L, linkedBlockingQueue.size());
        verifyLookupMsg((KvStateMessage.LookupKvStateLocation) linkedBlockingQueue.poll(), jobID, "roll");
    }

    @Test
    public void testRetryOnUnknownJobManager() throws Exception {
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        AkkaKvStateLocationLookupService.LookupRetryStrategyFactory lookupRetryStrategyFactory = new AkkaKvStateLocationLookupService.LookupRetryStrategyFactory() { // from class: org.apache.flink.runtime.query.AkkaKvStateLocationLookupServiceTest.1
            public AkkaKvStateLocationLookupService.LookupRetryStrategy createRetryStrategy() {
                return (AkkaKvStateLocationLookupService.LookupRetryStrategy) linkedBlockingQueue.poll();
            }
        };
        final TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
        AkkaKvStateLocationLookupService akkaKvStateLocationLookupService = new AkkaKvStateLocationLookupService(testingLeaderRetrievalService, testActorSystem, TIMEOUT, lookupRetryStrategyFactory);
        akkaKvStateLocationLookupService.start();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        linkedBlockingQueue.add(new AkkaKvStateLocationLookupService.LookupRetryStrategy() { // from class: org.apache.flink.runtime.query.AkkaKvStateLocationLookupServiceTest.2
            public FiniteDuration getRetryDelay() {
                return FiniteDuration.Zero();
            }

            public boolean tryRetry() {
                return atomicBoolean.compareAndSet(false, true);
            }
        });
        Await.ready(akkaKvStateLocationLookupService.getKvStateLookupInfo(new JobID(), "yessir"), TIMEOUT);
        Assert.assertTrue("Did not retry ", atomicBoolean.get());
        LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        KvStateLocation kvStateLocation = new KvStateLocation(new JobID(), new JobVertexID(), 12122, "garlic");
        final String akkaURL = AkkaUtils.getAkkaURL(testActorSystem, LookupResponseActor.create(linkedBlockingQueue2, null, kvStateLocation));
        linkedBlockingQueue.add(new AkkaKvStateLocationLookupService.LookupRetryStrategy() { // from class: org.apache.flink.runtime.query.AkkaKvStateLocationLookupServiceTest.3
            public FiniteDuration getRetryDelay() {
                return FiniteDuration.apply(100L, TimeUnit.MILLISECONDS);
            }

            public boolean tryRetry() {
                testingLeaderRetrievalService.notifyListener(akkaURL, HighAvailabilityServices.DEFAULT_LEADER_ID);
                return true;
            }
        });
        Assert.assertEquals(kvStateLocation, (KvStateLocation) Await.result(akkaKvStateLocationLookupService.getKvStateLookupInfo(new JobID(), "yessir"), TIMEOUT));
    }

    @Test
    public void testUnexpectedResponseType() throws Exception {
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        AkkaKvStateLocationLookupService akkaKvStateLocationLookupService = new AkkaKvStateLocationLookupService(testingLeaderRetrievalService, testActorSystem, TIMEOUT, new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
        akkaKvStateLocationLookupService.start();
        testingLeaderRetrievalService.notifyListener(AkkaUtils.getAkkaURL(testActorSystem, LookupResponseActor.create(linkedBlockingQueue, null, "unexpected-response-type")), null);
        try {
            Await.result(akkaKvStateLocationLookupService.getKvStateLookupInfo(new JobID(), "spicy"), TIMEOUT);
            Assert.fail("Did not throw expected Exception");
        } catch (Throwable th) {
        }
    }

    private static void verifyLookupMsg(KvStateMessage.LookupKvStateLocation lookupKvStateLocation, JobID jobID, String str) {
        Assert.assertNotNull(lookupKvStateLocation);
        Assert.assertEquals(jobID, lookupKvStateLocation.getJobId());
        Assert.assertEquals(str, lookupKvStateLocation.getRegistrationName());
    }
}
