package org.apache.flink.test.scheduling;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.class */
public class PipelinedRegionSchedulingITCase extends TestLogger {

    /* loaded from: input_file:org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase$OneTimeFailingReceiverWithPartitionException.class */
    public static class OneTimeFailingReceiverWithPartitionException extends AbstractInvokable {
        private static final AtomicBoolean hasFailed = new AtomicBoolean(false);

        public OneTimeFailingReceiverWithPartitionException(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            if (hasFailed.compareAndSet(false, true)) {
                throw new PartitionNotFoundException(getEnvironment().getInputGate(0).getChannel(1).getPartitionId());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase$PipelinedSender.class */
    public static class PipelinedSender extends AbstractInvokable {
        public PipelinedSender(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            if (getEnvironment().getAllWriters().length < 1) {
                throw new IllegalStateException();
            }
            RecordWriter build = new RecordWriterBuilder().build(getEnvironment().getWriter(0));
            try {
                build.emit(new IntValue(42));
                build.flushAll();
                if (getIndexInSubtaskGroup() == 0) {
                    Thread.sleep(2000L);
                }
            } finally {
                build.close();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase$Receiver.class */
    public static class Receiver extends AbstractInvokable {
        public Receiver(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            if (getEnvironment().getAllInputGates().length < 2) {
                throw new IllegalStateException();
            }
            String[] tmpDirectories = getEnvironment().getTaskManagerInfo().getTmpDirectories();
            for (RecordReader recordReader : (List) Arrays.asList(getEnvironment().getAllInputGates()).stream().map(indexedInputGate -> {
                return new RecordReader(indexedInputGate, IntValue.class, tmpDirectories);
            }).collect(Collectors.toList())) {
                while (recordReader.hasNext()) {
                    recordReader.next();
                }
            }
        }
    }

    @Test
    public void testSuccessWithSlotsNoFewerThanTheMaxRegionRequired() throws Exception {
        Assert.assertThat(Boolean.valueOf(executeSchedulingTest(2).getSerializedThrowable().isPresent()), CoreMatchers.is(false));
    }

    @Test
    public void testFailsOnInsufficientSlots() throws Exception {
        JobResult executeSchedulingTest = executeSchedulingTest(1);
        Assert.assertThat(Boolean.valueOf(executeSchedulingTest.getSerializedThrowable().isPresent()), CoreMatchers.is(true));
        Optional findThrowable = ExceptionUtils.findThrowable(((SerializedThrowable) executeSchedulingTest.getSerializedThrowable().get()).deserializeError(ClassLoader.getSystemClassLoader()), NoResourceAvailableException.class);
        Assert.assertThat(Boolean.valueOf(findThrowable.isPresent()), CoreMatchers.is(true));
        Assert.assertThat(((NoResourceAvailableException) findThrowable.get()).getMessage(), CoreMatchers.containsString("Slot request bulk is not fulfillable!"));
    }

    @Test(timeout = 120000)
    public void testRecoverFromPartitionException() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, RestartStrategyOptions.RestartStrategyType.FIXED_DELAY.getMainValue());
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
        OneTimeFailingReceiverWithPartitionException.hasFailed.set(false);
        Assert.assertThat(Boolean.valueOf(executeSchedulingTest(createJobGraphWithThreeStages(2), 2, configuration).getSerializedThrowable().isPresent()), CoreMatchers.is(false));
    }

    private JobResult executeSchedulingTest(int i) throws Exception {
        return executeSchedulingTest(createJobGraph(2), i, new Configuration());
    }

    private JobResult executeSchedulingTest(JobGraph jobGraph, int i, Configuration configuration) throws Exception {
        configuration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 30000L);
        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Default);
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setConfiguration(configuration).setNumTaskManagers(1).setNumSlotsPerTaskManager(i).build());
        Throwable th = null;
        try {
            try {
                miniCluster.start();
                MiniClusterClient miniClusterClient = new MiniClusterClient(configuration, miniCluster);
                JobResult jobResult = (JobResult) miniClusterClient.requestJobResult((JobID) miniClusterClient.submitJob(jobGraph).get()).get();
                if (miniCluster != null) {
                    if (0 != 0) {
                        try {
                            miniCluster.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        miniCluster.close();
                    }
                }
                return jobResult;
            } finally {
            }
        } catch (Throwable th3) {
            if (miniCluster != null) {
                if (th != null) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th3;
        }
    }

    private JobGraph createJobGraph(int i) {
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        JobVertex jobVertex = new JobVertex("source1");
        jobVertex.setInvokableClass(PipelinedSender.class);
        jobVertex.setParallelism(i * 2);
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
        JobVertex jobVertex2 = new JobVertex("source2");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(i);
        jobVertex2.setSlotSharingGroup(slotSharingGroup2);
        JobVertex jobVertex3 = new JobVertex("sink");
        jobVertex3.setInvokableClass(Receiver.class);
        jobVertex3.setParallelism(i);
        jobVertex3.setSlotSharingGroup(slotSharingGroup);
        jobVertex3.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        return JobGraphTestUtils.batchJobGraph(new JobVertex[]{jobVertex, jobVertex2, jobVertex3});
    }

    private JobGraph createJobGraphWithThreeStages(int i) {
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
        JobVertex jobVertex2 = new JobVertex("map");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(i);
        jobVertex2.setSlotSharingGroup(slotSharingGroup2);
        SlotSharingGroup slotSharingGroup3 = new SlotSharingGroup();
        JobVertex jobVertex3 = new JobVertex("sink");
        jobVertex3.setInvokableClass(OneTimeFailingReceiverWithPartitionException.class);
        jobVertex3.setParallelism(i);
        jobVertex3.setSlotSharingGroup(slotSharingGroup3);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        return JobGraphTestUtils.batchJobGraph(new JobVertex[]{jobVertex, jobVertex2, jobVertex3});
    }
}
