package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.partition.PartitionTable;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.TriConsumer;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.class */
public class TaskExecutorPartitionLifecycleTest extends TestLogger {
    private static final Time timeout = Time.seconds(10);
    private static final TestingRpcService RPC = new TestingRpcService();
    private final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    private final SettableLeaderRetrievalService jobManagerLeaderRetriever = new SettableLeaderRetrievalService();
    private final SettableLeaderRetrievalService resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
    private final JobID jobId = new JobID();

    @Rule
    public final TemporaryFolder tmp = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest$TestingInvokable.class */
    public static class TestingInvokable extends AbstractInvokable {
        static BlockerSync sync;

        public TestingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            sync.block();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest$TestingShuffleEnvironment.class */
    public static class TestingShuffleEnvironment implements ShuffleEnvironment<ResultPartition, SingleInputGate> {
        private final ShuffleEnvironment<ResultPartition, SingleInputGate> backingShuffleEnvironment;
        CompletableFuture<Collection<ResultPartitionID>> releasePartitionsLocallyFuture;

        private TestingShuffleEnvironment() {
            this.backingShuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
            this.releasePartitionsLocallyFuture = null;
        }

        public int start() throws IOException {
            return this.backingShuffleEnvironment.start();
        }

        public ShuffleIOOwnerContext createShuffleIOOwnerContext(String str, ExecutionAttemptID executionAttemptID, MetricGroup metricGroup) {
            return this.backingShuffleEnvironment.createShuffleIOOwnerContext(str, executionAttemptID, metricGroup);
        }

        public Collection<ResultPartition> createResultPartitionWriters(ShuffleIOOwnerContext shuffleIOOwnerContext, Collection<ResultPartitionDeploymentDescriptor> collection) {
            return this.backingShuffleEnvironment.createResultPartitionWriters(shuffleIOOwnerContext, collection);
        }

        public void releasePartitionsLocally(Collection<ResultPartitionID> collection) {
            this.backingShuffleEnvironment.releasePartitionsLocally(collection);
            if (this.releasePartitionsLocallyFuture != null) {
                this.releasePartitionsLocallyFuture.complete(collection);
            }
        }

        public Collection<ResultPartitionID> getPartitionsOccupyingLocalResources() {
            return this.backingShuffleEnvironment.getPartitionsOccupyingLocalResources();
        }

        public Collection<SingleInputGate> createInputGates(ShuffleIOOwnerContext shuffleIOOwnerContext, PartitionProducerStateProvider partitionProducerStateProvider, Collection<InputGateDeploymentDescriptor> collection) {
            return this.backingShuffleEnvironment.createInputGates(shuffleIOOwnerContext, partitionProducerStateProvider, collection);
        }

        public boolean updatePartitionInfo(ExecutionAttemptID executionAttemptID, PartitionInfo partitionInfo) throws IOException, InterruptedException {
            return this.backingShuffleEnvironment.updatePartitionInfo(executionAttemptID, partitionInfo);
        }

        public void close() throws Exception {
            this.backingShuffleEnvironment.close();
        }
    }

    @Before
    public void setup() {
        this.haServices.setResourceManagerLeaderRetriever(this.resourceManagerLeaderRetriever);
        this.haServices.setJobMasterLeaderRetriever(this.jobId, this.jobManagerLeaderRetriever);
    }

    @After
    public void shutdown() {
        RPC.clearGateways();
    }

    @AfterClass
    public static void shutdownClass() throws ExecutionException, InterruptedException {
        RPC.stopService().get();
    }

    @Test
    public void testConnectionTerminationAfterExternalRelease() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        JobManagerConnection createJobManagerConnection = TaskSubmissionTestEnvironment.createJobManagerConnection(this.jobId, new TestingJobMasterGatewayBuilder().setDisconnectTaskManagerFunction(resourceID -> {
            completableFuture.complete(null);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build(), RPC, new NoOpTaskManagerActions(), timeout);
        JobManagerTable jobManagerTable = new JobManagerTable();
        jobManagerTable.put(this.jobId, createJobManagerConnection);
        TestingShuffleEnvironment testingShuffleEnvironment = new TestingShuffleEnvironment();
        TaskManagerServices build = new TaskManagerServicesBuilder().setJobManagerTable(jobManagerTable).setShuffleEnvironment(testingShuffleEnvironment).setTaskSlotTable(createTaskSlotTable()).build();
        PartitionTable<JobID> partitionTable = new PartitionTable<>();
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        TestingTaskExecutor createTestingTaskExecutor = createTestingTaskExecutor(build, partitionTable);
        try {
            createTestingTaskExecutor.start();
            createTestingTaskExecutor.waitUntilStarted();
            TaskExecutorGateway selfGateway = createTestingTaskExecutor.getSelfGateway(TaskExecutorGateway.class);
            runInTaskExecutorThreadAndWait(createTestingTaskExecutor, () -> {
                Assert.assertTrue(jobManagerTable.contains(this.jobId));
            });
            runInTaskExecutorThreadAndWait(createTestingTaskExecutor, () -> {
                partitionTable.startTrackingPartitions(this.jobId, Collections.singletonList(resultPartitionID));
            });
            CompletableFuture completableFuture2 = new CompletableFuture();
            runInTaskExecutorThreadAndWait(createTestingTaskExecutor, () -> {
                testingShuffleEnvironment.releasePartitionsLocallyFuture = completableFuture2;
            });
            selfGateway.releasePartitions(this.jobId, Collections.singletonList(new ResultPartitionID()));
            completableFuture2.get();
            runInTaskExecutorThreadAndWait(createTestingTaskExecutor, () -> {
                Assert.assertTrue(jobManagerTable.contains(this.jobId));
            });
            CompletableFuture completableFuture3 = new CompletableFuture();
            runInTaskExecutorThreadAndWait(createTestingTaskExecutor, () -> {
                testingShuffleEnvironment.releasePartitionsLocallyFuture = completableFuture3;
            });
            selfGateway.releasePartitions(this.jobId, Collections.singletonList(resultPartitionID));
            completableFuture.get();
            RpcUtils.terminateRpcEndpoint(createTestingTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTestingTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testBlockingPartitionReleaseAfterDisconnect() throws Exception {
        testPartitionRelease((jobID, resultPartitionID, taskExecutorGateway) -> {
            taskExecutorGateway.disconnectJobManager(jobID, new Exception("test"));
        }, true, ResultPartitionType.BLOCKING);
    }

    @Test
    public void testPipelinedPartitionNotReleasedAfterDisconnect() throws Exception {
        testPartitionRelease((jobID, resultPartitionID, taskExecutorGateway) -> {
            taskExecutorGateway.disconnectJobManager(jobID, new Exception("test"));
        }, false, ResultPartitionType.PIPELINED);
    }

    @Test
    public void testBlockingPartitionReleaseAfterReleaseCall() throws Exception {
        testPartitionRelease((jobID, resultPartitionID, taskExecutorGateway) -> {
            taskExecutorGateway.releasePartitions(jobID, Collections.singletonList(resultPartitionID));
        }, true, ResultPartitionType.BLOCKING);
    }

    @Test
    public void testPipelinedPartitionReleaseAfterReleaseCall() throws Exception {
        testPartitionRelease((jobID, resultPartitionID, taskExecutorGateway) -> {
            taskExecutorGateway.releasePartitions(jobID, Collections.singletonList(resultPartitionID));
        }, true, ResultPartitionType.PIPELINED);
    }

    @Test
    public void testBlockingPartitionReleaseAfterShutdown() throws Exception {
        testPartitionRelease((jobID, resultPartitionID, taskExecutorGateway) -> {
        }, false, ResultPartitionType.BLOCKING);
    }

    @Test
    public void testPipelinedPartitionReleaseAfterShutdown() throws Exception {
        testPartitionRelease((jobID, resultPartitionID, taskExecutorGateway) -> {
        }, false, ResultPartitionType.PIPELINED);
    }

    private void testPartitionRelease(TriConsumer<JobID, ResultPartitionID, TaskExecutorGateway> triConsumer, boolean z, ResultPartitionType resultPartitionType) throws Exception {
        ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor = PartitionTestUtils.createPartitionDeploymentDescriptor(resultPartitionType);
        TaskDeploymentDescriptor createTaskDeploymentDescriptor = TaskExecutorSubmissionTest.createTaskDeploymentDescriptor(this.jobId, "job", createPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId(), new SerializedValue(new ExecutionConfig()), "Sender", 1, 0, 1, 0, new Configuration(), new Configuration(), TestingInvokable.class.getName(), Collections.singletonList(createPartitionDeploymentDescriptor), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0);
        TaskSlotTable createTaskSlotTable = createTaskSlotTable();
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor());
        TestingShuffleEnvironment testingShuffleEnvironment = new TestingShuffleEnvironment();
        TaskManagerServices build = new TaskManagerServicesBuilder().setTaskSlotTable(createTaskSlotTable).setTaskStateManager(taskExecutorLocalStateStoresManager).setShuffleEnvironment(testingShuffleEnvironment).build();
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        RpcGateway build2 = new TestingJobMasterGatewayBuilder().setRegisterTaskManagerFunction((str, taskManagerLocation) -> {
            return CompletableFuture.completedFuture(new JMTMRegistrationSuccess(ResourceID.generate()));
        }).setOfferSlotsFunction((resourceID, collection) -> {
            oneShotLatch.trigger();
            return CompletableFuture.completedFuture(collection);
        }).setUpdateTaskExecutionStateFunction(taskExecutionState -> {
            if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) {
                completableFuture.complete(null);
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        PartitionTable<JobID> partitionTable = new PartitionTable<>();
        TestingTaskExecutor createTestingTaskExecutor = createTestingTaskExecutor(build, partitionTable);
        CompletableFuture completableFuture2 = new CompletableFuture();
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
            completableFuture2.complete(tuple3.f2);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        testingResourceManagerGateway.setRegisterTaskExecutorFunction(tuple4 -> {
            return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), testingResourceManagerGateway.getOwnResourceId(), new ClusterInformation("blobServerHost", 55555)));
        });
        try {
            createTestingTaskExecutor.start();
            createTestingTaskExecutor.waitUntilStarted();
            TaskExecutorGateway selfGateway = createTestingTaskExecutor.getSelfGateway(TaskExecutorGateway.class);
            RPC.registerGateway("jm", build2);
            RPC.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            build.getJobLeaderService().addJob(this.jobId, "jm");
            this.jobManagerLeaderRetriever.notifyListener("jm", UUID.randomUUID());
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m245getFencingToken().toUUID());
            Optional findAny = StreamSupport.stream(((SlotReport) completableFuture2.get()).spliterator(), false).findAny();
            Assert.assertTrue(findAny.isPresent());
            SlotStatus slotStatus = (SlotStatus) findAny.get();
            while (true) {
                try {
                    selfGateway.requestSlot(slotStatus.getSlotID(), this.jobId, createTaskDeploymentDescriptor.getAllocationId(), "jm", testingResourceManagerGateway.m245getFencingToken(), timeout).get();
                    break;
                } catch (Exception e) {
                    Thread.sleep(50L);
                }
            }
            TestingInvokable.sync = new BlockerSync();
            oneShotLatch.await();
            selfGateway.submitTask(createTaskDeploymentDescriptor, build2.m140getFencingToken(), timeout).get();
            TestingInvokable.sync.awaitBlocker();
            runInTaskExecutorThreadAndWait(createTestingTaskExecutor, () -> {
                MatcherAssert.assertThat(Boolean.valueOf(partitionTable.hasTrackedPartitions(this.jobId)), Matchers.is(Boolean.valueOf(resultPartitionType.isBlocking())));
            });
            TestingInvokable.sync.releaseBlocker();
            completableFuture.get(timeout.getSize(), timeout.getUnit());
            runInTaskExecutorThreadAndWait(createTestingTaskExecutor, () -> {
                MatcherAssert.assertThat(Boolean.valueOf(partitionTable.hasTrackedPartitions(this.jobId)), Matchers.is(Boolean.valueOf(resultPartitionType.isBlocking())));
            });
            CompletableFuture completableFuture3 = new CompletableFuture();
            runInTaskExecutorThreadAndWait(createTestingTaskExecutor, () -> {
                testingShuffleEnvironment.releasePartitionsLocallyFuture = completableFuture3;
            });
            triConsumer.accept(this.jobId, createPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID(), selfGateway);
            if (z) {
                MatcherAssert.assertThat((Collection) completableFuture3.get(), Matchers.contains(new ResultPartitionID[]{createPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID()}));
            }
            Assert.assertTrue(testingShuffleEnvironment.getPartitionsOccupyingLocalResources().isEmpty());
        } finally {
            RpcUtils.terminateRpcEndpoint(createTestingTaskExecutor, timeout);
        }
    }

    private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices, PartitionTable<JobID> partitionTable) throws IOException {
        return new TestingTaskExecutor(RPC, TaskManagerConfiguration.fromConfiguration(new Configuration()), this.haServices, taskManagerServices, new HeartbeatServices(10000L, 30000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, new BlobCacheService(new Configuration(), new VoidBlobStore(), (InetSocketAddress) null), new TestingFatalErrorHandler(), partitionTable);
    }

    private static TaskSlotTable createTaskSlotTable() {
        return new TaskSlotTable(Collections.singletonList(ResourceProfile.UNKNOWN), new TimerService(TestingUtils.defaultExecutor(), timeout.toMilliseconds()));
    }

    private static void runInTaskExecutorThreadAndWait(TaskExecutor taskExecutor, Runnable runnable) throws ExecutionException, InterruptedException {
        taskExecutor.getRpcService().scheduleRunnable(runnable, 0L, TimeUnit.SECONDS).get();
    }
}
