/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionedFile;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionReadScheduler;
import org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
    private static final int bufferSize = 1024;
    private static final byte[] dataBytes = new byte[1024];
    private static final int totalBytes = 1024;
    private static final int numThreads = 4;
    private static final int numSubpartitions = 10;
    private static final int numBuffersPerSubpartition = 10;
    private PartitionedFile partitionedFile;
    private BatchShuffleReadBufferPool bufferPool;
    private ExecutorService executor;
    private SortMergeResultPartitionReadScheduler readScheduler;
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public Timeout timeout = new Timeout(60L, TimeUnit.SECONDS);

    @Before
    public void before() throws Exception {
        Random random = new Random();
        random.nextBytes(dataBytes);
        this.partitionedFile = PartitionTestUtils.createPartitionedFile(this.temporaryFolder.newFile().getAbsolutePath(), 10, 10, 1024, dataBytes);
        this.bufferPool = new BatchShuffleReadBufferPool(1024L, 1024);
        this.executor = Executors.newFixedThreadPool(4);
        this.readScheduler = new SortMergeResultPartitionReadScheduler(this.bufferPool, (Executor)this.executor, (Object)this);
    }

    @After
    public void after() {
        this.partitionedFile.deleteQuietly();
        this.bufferPool.destroy();
        this.executor.shutdown();
    }

    @Test
    public void testCreateSubpartitionReader() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.readScheduler.crateSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        Assert.assertTrue((boolean)this.readScheduler.isRunning());
        Assert.assertTrue((boolean)this.readScheduler.getDataFileChannel().isOpen());
        Assert.assertTrue((boolean)this.readScheduler.getIndexFileChannel().isOpen());
        int numBuffersRead = 0;
        while (numBuffersRead < 10) {
            ResultSubpartition.BufferAndBacklog bufferAndBacklog = subpartitionReader.getNextBuffer();
            if (bufferAndBacklog == null) continue;
            Buffer buffer = bufferAndBacklog.buffer();
            Assert.assertEquals((Object)ByteBuffer.wrap(dataBytes), (Object)buffer.getNioBufferReadable());
            buffer.recycleBuffer();
            ++numBuffersRead;
        }
    }

    @Test
    public void testOnSubpartitionReaderError() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.readScheduler.crateSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        subpartitionReader.releaseAllResources();
        this.waitUntilReadFinish();
        this.assertAllResourcesReleased();
    }

    @Test
    public void testReleaseWhileReading() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.readScheduler.crateSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        Thread.sleep(1000L);
        this.readScheduler.release();
        Assert.assertNotNull((Object)subpartitionReader.getFailureCause());
        Assert.assertTrue((boolean)subpartitionReader.isReleased());
        Assert.assertEquals((long)0L, (long)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
        Assert.assertTrue((boolean)subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
        this.readScheduler.getReleaseFuture().get();
        this.assertAllResourcesReleased();
    }

    @Test(expected=IllegalStateException.class)
    public void testCreateSubpartitionReaderAfterReleased() throws Exception {
        this.readScheduler.release();
        try {
            this.readScheduler.crateSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        }
        finally {
            this.assertAllResourcesReleased();
        }
    }

    @Test
    public void testOnDataReadError() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.readScheduler.crateSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        this.readScheduler.getDataFileChannel().close();
        while (!subpartitionReader.isReleased()) {
            ResultSubpartition.BufferAndBacklog bufferAndBacklog = subpartitionReader.getNextBuffer();
            if (bufferAndBacklog == null) continue;
            bufferAndBacklog.buffer().recycleBuffer();
        }
        this.waitUntilReadFinish();
        Assert.assertNotNull((Object)subpartitionReader.getFailureCause());
        Assert.assertTrue((boolean)subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
        this.assertAllResourcesReleased();
    }

    @Test
    public void testOnReadBufferRequestError() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.readScheduler.crateSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        this.bufferPool.destroy();
        this.waitUntilReadFinish();
        Assert.assertTrue((boolean)subpartitionReader.isReleased());
        Assert.assertNotNull((Object)subpartitionReader.getFailureCause());
        Assert.assertTrue((boolean)subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
        this.assertAllResourcesReleased();
    }

    private void assertAllResourcesReleased() {
        Assert.assertNull((Object)this.readScheduler.getDataFileChannel());
        Assert.assertNull((Object)this.readScheduler.getIndexFileChannel());
        Assert.assertFalse((boolean)this.readScheduler.isRunning());
        Assert.assertEquals((long)0L, (long)this.readScheduler.getNumPendingReaders());
        if (!this.bufferPool.isDestroyed()) {
            Assert.assertEquals((long)this.bufferPool.getNumTotalBuffers(), (long)this.bufferPool.getAvailableBuffers());
        }
    }

    private void waitUntilReadFinish() throws Exception {
        while (this.readScheduler.isRunning()) {
            Thread.sleep(100L);
        }
    }
}

