package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.util.TestLoggerExtension;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.class */
class TaskExecutorRecoveryTest {
    private final TestingRpcServiceExtension rpcServiceExtension = new TestingRpcServiceExtension();

    @RegisterExtension
    private final EachCallbackWrapper<TestingRpcServiceExtension> eachWrapper = new EachCallbackWrapper<>(this.rpcServiceExtension);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest$TaskExecutorSlotReport.class */
    public static final class TaskExecutorSlotReport {
        private final ResourceID taskExecutorResourceId;
        private final SlotReport slotReport;

        private TaskExecutorSlotReport(ResourceID resourceID, SlotReport slotReport) {
            this.taskExecutorResourceId = resourceID;
            this.slotReport = slotReport;
        }

        public ResourceID getTaskExecutorResourceId() {
            return this.taskExecutorResourceId;
        }

        public SlotReport getSlotReport() {
            return this.slotReport;
        }

        public static TaskExecutorSlotReport create(ResourceID resourceID, SlotReport slotReport) {
            return new TaskExecutorSlotReport(resourceID, slotReport);
        }
    }

    TaskExecutorRecoveryTest() {
    }

    @Test
    public void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File file) throws Exception {
        ResourceID generate = ResourceID.generate();
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, 2);
        configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true);
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
            arrayBlockingQueue.offer(TaskExecutorSlotReport.create((ResourceID) tuple3.f0, (SlotReport) tuple3.f2));
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        TestingRpcService testingRpcService = this.rpcServiceExtension.getTestingRpcService();
        testingRpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
        JobID jobID = new JobID();
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        testingHighAvailabilityServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m522getFencingToken().toUUID()));
        SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService();
        testingHighAvailabilityServices.setJobMasterLeaderRetriever(jobID, settableLeaderRetrievalService);
        WorkingDirectory create = WorkingDirectory.create(file);
        TaskExecutor build = TaskExecutorBuilder.newBuilder(testingRpcService, testingHighAvailabilityServices, create).setConfiguration(configuration).setResourceId(generate).build();
        build.start();
        TaskExecutorGateway selfGateway = build.getSelfGateway(TaskExecutorGateway.class);
        SlotReport slotReport = ((TaskExecutorSlotReport) arrayBlockingQueue.take()).getSlotReport();
        MatcherAssert.assertThat(Integer.valueOf(slotReport.getNumSlotStatus()), Matchers.is(2));
        SlotStatus slotStatus = (SlotStatus) slotReport.iterator().next();
        SlotID slotID = slotStatus.getSlotID();
        AllocationID allocationID = new AllocationID();
        selfGateway.requestSlot(slotID, jobID, allocationID, slotStatus.getResourceProfile(), "localhost", testingResourceManagerGateway.m522getFencingToken(), Time.seconds(10L)).join();
        build.close();
        ArrayBlockingQueue arrayBlockingQueue2 = new ArrayBlockingQueue(1);
        TestingJobMasterGateway build2 = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, collection) -> {
            arrayBlockingQueue2.offer(new HashSet(collection));
            return CompletableFuture.completedFuture(collection);
        }).build();
        testingRpcService.registerGateway(build2.getAddress(), build2);
        settableLeaderRetrievalService.notifyListener(build2.getAddress(), build2.m381getFencingToken().toUUID());
        TaskExecutorBuilder.newBuilder(testingRpcService, testingHighAvailabilityServices, create).setConfiguration(configuration).setResourceId(generate).build().start();
        Iterator it = ((TaskExecutorSlotReport) arrayBlockingQueue.take()).getSlotReport().iterator();
        while (it.hasNext()) {
            SlotStatus slotStatus2 = (SlotStatus) it.next();
            if (slotStatus2.getSlotID().equals(slotID)) {
                MatcherAssert.assertThat(slotStatus2.getJobID(), Matchers.is(jobID));
                MatcherAssert.assertThat(slotStatus2.getAllocationID(), Matchers.is(allocationID));
            } else {
                MatcherAssert.assertThat(slotStatus2.getJobID(), Matchers.is(Matchers.nullValue()));
            }
        }
        Collection collection2 = (Collection) arrayBlockingQueue2.take();
        MatcherAssert.assertThat(collection2, Matchers.hasSize(1));
        MatcherAssert.assertThat(((SlotOffer) collection2.iterator().next()).getAllocationId(), Matchers.is(allocationID));
    }
}
