package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.util.TestNotificationListener;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.class */
public class AsynchronousBufferFileWriterTest {

    @Rule
    public ExpectedException exception = ExpectedException.none();
    private static final IOManager ioManager = new IOManagerAsync();
    private static final Buffer mockBuffer = (Buffer) Mockito.mock(Buffer.class);
    private AsynchronousBufferFileWriter writer;

    @AfterClass
    public static void shutdown() {
        ioManager.shutdown();
    }

    @Before
    public void setUp() throws IOException {
        this.writer = new AsynchronousBufferFileWriter(ioManager.createChannel(), new RequestQueue());
    }

    @Test
    public void testAddAndHandleRequest() throws Exception {
        addRequest();
        Assert.assertEquals("Didn't increment number of outstanding requests.", 1L, this.writer.getNumberOfOutstandingRequests());
        handleRequest();
        Assert.assertEquals("Didn't decrement number of outstanding requests.", 0L, this.writer.getNumberOfOutstandingRequests());
    }

    @Test
    public void testAddWithFailingWriter() throws Exception {
        AsynchronousBufferFileWriter asynchronousBufferFileWriter = new AsynchronousBufferFileWriter(ioManager.createChannel(), new RequestQueue());
        asynchronousBufferFileWriter.close();
        this.exception.expect(IOException.class);
        Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
        try {
            asynchronousBufferFileWriter.writeBlock(buffer);
        } finally {
            if (!buffer.isRecycled()) {
                buffer.recycle();
                Assert.fail("buffer not recycled");
            }
            Assert.assertEquals("Shouln't increment number of outstanding requests.", 0L, asynchronousBufferFileWriter.getNumberOfOutstandingRequests());
        }
    }

    @Test
    public void testSubscribe() throws Exception {
        TestNotificationListener testNotificationListener = new TestNotificationListener();
        Assert.assertFalse("Allowed to subscribe w/o any outstanding requests.", this.writer.registerAllRequestsProcessedListener(testNotificationListener));
        addRequest();
        Assert.assertTrue("Didn't allow to subscribe.", this.writer.registerAllRequestsProcessedListener(testNotificationListener));
        handleRequest();
        Assert.assertEquals("Listener was not notified.", 1L, testNotificationListener.getNumberOfNotifications());
    }

    @Test
    public void testSubscribeAndClose() throws IOException, InterruptedException {
        TestNotificationListener testNotificationListener = new TestNotificationListener();
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        addRequest();
        addRequest();
        this.writer.registerAllRequestsProcessedListener(testNotificationListener);
        new Thread(new Runnable() { // from class: org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriterTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    AsynchronousBufferFileWriterTest.this.writer.close();
                } catch (Throwable th) {
                    atomicReference.set(th);
                } finally {
                    countDownLatch.countDown();
                }
            }
        }).start();
        handleRequest();
        handleRequest();
        countDownLatch.await();
        Assert.assertEquals("Listener was not notified.", 1L, testNotificationListener.getNumberOfNotifications());
    }

    @Test
    public void testConcurrentSubscribeAndHandleRequest() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        final TestNotificationListener testNotificationListener = new TestNotificationListener();
        Callable<Boolean> callable = new Callable<Boolean>() { // from class: org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriterTest.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(AsynchronousBufferFileWriterTest.this.writer.registerAllRequestsProcessedListener(testNotificationListener));
            }
        };
        Callable<Void> callable2 = new Callable<Void>() { // from class: org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriterTest.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                AsynchronousBufferFileWriterTest.this.handleRequest();
                return null;
            }
        };
        for (int i = 0; i < 50000; i++) {
            try {
                testNotificationListener.reset();
                addRequest();
                Future submit = newFixedThreadPool.submit(callable2);
                Future submit2 = newFixedThreadPool.submit(callable);
                submit.get();
                try {
                    if (((Boolean) submit2.get()).booleanValue()) {
                        Assert.assertEquals("Race: Successfully subscribed, but was never notified.", 1L, testNotificationListener.getNumberOfNotifications());
                    } else {
                        Assert.assertEquals("Race: Never subscribed successfully, but was notified.", 0L, testNotificationListener.getNumberOfNotifications());
                    }
                } catch (Throwable th) {
                    System.out.println(i);
                    Assert.fail(th.getMessage());
                }
            } finally {
                newFixedThreadPool.shutdownNow();
            }
        }
    }

    private void addRequest() throws IOException {
        this.writer.writeBlock(mockBuffer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleRequest() {
        this.writer.handleProcessedBuffer(mockBuffer, (IOException) null);
    }
}
