package org.apache.flink.runtime.io.network.partition;

import akka.actor.ActorSystem;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.io.network.api.reader.BufferReader;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.class */
public class PartialConsumePipelinedResultTest {
    private static final int NUMBER_OF_TMS = 1;
    private static final int NUMBER_OF_SLOTS_PER_TM = 1;
    private static final int PARALLELISM = 1;
    private static final int NUMBER_OF_NETWORK_BUFFERS = 128;
    private static TestingCluster flink;
    private static ActorSystem jobClient;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest$SingleBufferReceiver.class */
    public static class SingleBufferReceiver extends AbstractInvokable {
        @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
        public void registerInputOutput() {
        }

        @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
        public void invoke() throws Exception {
            new BufferReader(getEnvironment().getInputGate(0)).getNextBuffer().recycle();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest$SlowBufferSender.class */
    public static class SlowBufferSender extends AbstractInvokable {
        @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
        public void registerInputOutput() {
        }

        @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
        public void invoke() throws Exception {
            ResultPartitionWriter writer = getEnvironment().getWriter(0);
            for (int i = 0; i < 8; i++) {
                writer.writeBuffer(writer.getBufferProvider().requestBufferBlocking(), 0);
                Thread.sleep(50L);
            }
        }
    }

    @BeforeClass
    public static void setUp() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
        configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
        configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
        configuration.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 128);
        flink = new TestingCluster(configuration, true);
        jobClient = JobClient.startJobClientActorSystem(flink.configuration());
    }

    @AfterClass
    public static void tearDown() throws Exception {
        flink.stop();
    }

    @Test
    public void testPartialConsumePipelinedResultReceiver() throws Exception {
        JobVertex jobVertex = new JobVertex("Sender");
        jobVertex.setInvokableClass(SlowBufferSender.class);
        jobVertex.setParallelism(1);
        JobVertex jobVertex2 = new JobVertex("Receiver");
        jobVertex2.setInvokableClass(SingleBufferReceiver.class);
        jobVertex2.setParallelism(1);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = new JobGraph("Partial Consume of Pipelined Result", jobVertex, jobVertex2);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup(jobVertex.getID(), jobVertex2.getID());
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.setSlotSharingGroup(slotSharingGroup);
        JobClient.submitJobAndWait(jobClient, flink.getJobManager(), jobGraph, TestingUtils.TESTING_DURATION(), false);
    }
}
