package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.util.TestNotificationListener;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(PowerMockRunner.class)
/* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.class */
public class AsynchronousFileIOChannelTest {
    private static final Logger LOG = LoggerFactory.getLogger(AsynchronousFileIOChannelTest.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest$FailingWriteRequest.class */
    public static class FailingWriteRequest implements WriteRequest {
        private final AsynchronousFileIOChannel<MemorySegment, WriteRequest> channel;
        private final MemorySegment segment;

        protected FailingWriteRequest(AsynchronousFileIOChannel<MemorySegment, WriteRequest> asynchronousFileIOChannel, MemorySegment memorySegment) {
            this.channel = asynchronousFileIOChannel;
            this.segment = memorySegment;
        }

        public void write() throws IOException {
            throw new IOException();
        }

        public void requestDone(IOException iOException) {
            this.channel.handleProcessedBuffer(this.segment, iOException);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest$NoOpCallback.class */
    public static class NoOpCallback implements RequestDoneCallback<MemorySegment> {
        private NoOpCallback() {
        }

        public void requestSuccessful(MemorySegment memorySegment) {
        }

        public void requestFailed(MemorySegment memorySegment, IOException iOException) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest$TestAsyncFileIOChannel.class */
    public static class TestAsyncFileIOChannel extends AsynchronousFileIOChannel<Buffer, WriteRequest> {
        protected TestAsyncFileIOChannel(FileIOChannel.ID id, RequestQueue<WriteRequest> requestQueue, RequestDoneCallback<Buffer> requestDoneCallback, boolean z) throws IOException {
            super(id, requestQueue, requestDoneCallback, z);
        }

        int getNumberOfOutstandingRequests() {
            return this.requestsNotReturned.get();
        }
    }

    @Test
    public void testAllRequestsProcessedListenerNotification() throws Exception {
        IOManagerAsync iOManagerAsync = new IOManagerAsync();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        final Random random = new Random();
        final RequestQueue requestQueue = new RequestQueue();
        RequestDoneCallback requestDoneCallback = (RequestDoneCallback) Mockito.mock(RequestDoneCallback.class);
        final TestNotificationListener testNotificationListener = new TestNotificationListener();
        for (int i = 0; i < 10; i++) {
            try {
                final TestAsyncFileIOChannel testAsyncFileIOChannel = new TestAsyncFileIOChannel(iOManagerAsync.createChannel(), requestQueue, requestDoneCallback, true);
                final CountDownLatch countDownLatch = new CountDownLatch(3);
                final Buffer buffer = (Buffer) Mockito.mock(Buffer.class);
                final WriteRequest writeRequest = (WriteRequest) Mockito.mock(WriteRequest.class);
                Callable<Void> callable = new Callable<Void>() { // from class: org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannelTest.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        for (int i2 = 0; i2 < 10; i2++) {
                            AsynchronousFileIOChannelTest.LOG.debug("Starting run {}.", Integer.valueOf(i2 + 1));
                            for (int i3 = 0; i3 < 100; i3++) {
                                testAsyncFileIOChannel.addRequest(writeRequest);
                            }
                            AsynchronousFileIOChannelTest.LOG.debug("Added all ({}) requests of run {}.", 100, Integer.valueOf(i2 + 1));
                            int nextInt = random.nextInt(10);
                            AsynchronousFileIOChannelTest.LOG.debug("Sleeping for {} ms before next run.", Integer.valueOf(nextInt));
                            Thread.sleep(nextInt);
                        }
                        AsynchronousFileIOChannelTest.LOG.debug("Done. Closing channel.");
                        testAsyncFileIOChannel.close();
                        countDownLatch.countDown();
                        return null;
                    }
                };
                Callable<Void> callable2 = new Callable<Void>() { // from class: org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannelTest.2
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        for (int i2 = 0; i2 < 1000; i2++) {
                            requestQueue.take();
                            testAsyncFileIOChannel.handleProcessedBuffer(buffer, null);
                        }
                        AsynchronousFileIOChannelTest.LOG.debug("Processed all ({}) requests.", 100);
                        countDownLatch.countDown();
                        return null;
                    }
                };
                Callable<Void> callable3 = new Callable<Void>() { // from class: org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannelTest.3
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        while (true) {
                            int numberOfNotifications = testNotificationListener.getNumberOfNotifications();
                            if (testAsyncFileIOChannel.registerAllRequestsProcessedListener(testNotificationListener)) {
                                testNotificationListener.waitForNotification(numberOfNotifications);
                            } else if (testAsyncFileIOChannel.isClosed()) {
                                AsynchronousFileIOChannelTest.LOG.debug("Stopping listener. Channel closed.");
                                countDownLatch.countDown();
                                return null;
                            }
                        }
                    }
                };
                LinkedList linkedList = new LinkedList();
                linkedList.add(callable);
                linkedList.add(callable2);
                linkedList.add(callable3);
                Collections.shuffle(linkedList);
                Iterator it = linkedList.iterator();
                while (it.hasNext()) {
                    newFixedThreadPool.submit((Callable) it.next());
                }
                if (!countDownLatch.await(2L, TimeUnit.MINUTES)) {
                    Assert.fail("Test failed due to a timeout. This indicates a deadlock due to the waythat listeners are registered/notified in the asynchronous file I/Ochannel.");
                }
                testNotificationListener.reset();
            } finally {
                iOManagerAsync.shutdown();
                newFixedThreadPool.shutdown();
            }
        }
    }

    @Test
    public void testClosedButAddRequestAndRegisterListenerRace() throws Exception {
        IOManagerAsync iOManagerAsync = new IOManagerAsync();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        RequestQueue requestQueue = new RequestQueue();
        RequestDoneCallback requestDoneCallback = (RequestDoneCallback) Mockito.mock(RequestDoneCallback.class);
        final TestNotificationListener testNotificationListener = new TestNotificationListener();
        for (int i = 0; i < 1024; i++) {
            try {
                final TestAsyncFileIOChannel testAsyncFileIOChannel = new TestAsyncFileIOChannel(iOManagerAsync.createChannel(), requestQueue, requestDoneCallback, true);
                final CountDownLatch countDownLatch = new CountDownLatch(2);
                final WriteRequest writeRequest = (WriteRequest) Mockito.mock(WriteRequest.class);
                testAsyncFileIOChannel.close();
                Callable<Void> callable = new Callable<Void>() { // from class: org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannelTest.4
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        try {
                            testAsyncFileIOChannel.addRequest(writeRequest);
                            return null;
                        } catch (Throwable th) {
                            return null;
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                };
                Callable<Void> callable2 = new Callable<Void>() { // from class: org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannelTest.5
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        while (true) {
                            try {
                                int numberOfNotifications = testNotificationListener.getNumberOfNotifications();
                                if (testAsyncFileIOChannel.registerAllRequestsProcessedListener(testNotificationListener)) {
                                    testNotificationListener.waitForNotification(numberOfNotifications);
                                } else if (testAsyncFileIOChannel.isClosed()) {
                                    return null;
                                }
                            } finally {
                                countDownLatch.countDown();
                            }
                        }
                    }
                };
                newFixedThreadPool.submit(callable);
                newFixedThreadPool.submit(callable2);
                if (!countDownLatch.await(2L, TimeUnit.MINUTES)) {
                    Assert.fail("Test failed due to a timeout. This indicates a deadlock due to the waythat listeners are registered/notified in the asynchronous file I/Ochannel.");
                }
            } finally {
                iOManagerAsync.shutdown();
                newFixedThreadPool.shutdown();
            }
        }
    }

    @Test
    public void testClosingWaits() {
        IOManagerAsync iOManagerAsync = new IOManagerAsync();
        try {
            try {
                final MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(32768);
                final AtomicInteger atomicInteger = new AtomicInteger();
                final AtomicBoolean atomicBoolean = new AtomicBoolean();
                BlockChannelWriterWithCallback createBlockChannelWriter = iOManagerAsync.createBlockChannelWriter(iOManagerAsync.createChannel(), new RequestDoneCallback<MemorySegment>() { // from class: org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannelTest.6
                    public void requestSuccessful(MemorySegment memorySegment) {
                        atomicInteger.set(atomicInteger.get() + 1);
                        if (memorySegment != allocateUnpooledSegment) {
                            atomicBoolean.set(true);
                        }
                    }

                    public void requestFailed(MemorySegment memorySegment, IOException iOException) {
                        atomicBoolean.set(true);
                    }
                });
                for (int i = 0; i < 100; i++) {
                    try {
                        createBlockChannelWriter.writeBlock(allocateUnpooledSegment);
                    } catch (Throwable th) {
                        createBlockChannelWriter.closeAndDelete();
                        throw th;
                    }
                }
                createBlockChannelWriter.close();
                Assert.assertEquals(100L, atomicInteger.get());
                Assert.assertFalse(atomicBoolean.get());
                createBlockChannelWriter.closeAndDelete();
                iOManagerAsync.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                iOManagerAsync.shutdown();
            }
        } catch (Throwable th2) {
            iOManagerAsync.shutdown();
            throw th2;
        }
    }

    @Test
    public void testExceptionForwardsToClose() {
        IOManagerAsync iOManagerAsync = new IOManagerAsync();
        try {
            testExceptionForwardsToClose(iOManagerAsync, 100, 1);
            testExceptionForwardsToClose(iOManagerAsync, 100, 50);
            testExceptionForwardsToClose(iOManagerAsync, 100, 100);
        } finally {
            iOManagerAsync.shutdown();
        }
    }

    private void testExceptionForwardsToClose(IOManagerAsync iOManagerAsync, int i, final int i2) {
        try {
            MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(32768);
            FileIOChannel.ID createChannel = iOManagerAsync.createChannel();
            AsynchronousBlockWriterWithCallback asynchronousBlockWriterWithCallback = new AsynchronousBlockWriterWithCallback(createChannel, iOManagerAsync.getWriteRequestQueue(createChannel), new NoOpCallback()) { // from class: org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannelTest.7
                private int numBlocks;

                public void writeBlock(MemorySegment memorySegment) throws IOException {
                    this.numBlocks++;
                    if (this.numBlocks != i2) {
                        super.writeBlock(memorySegment);
                    } else {
                        this.requestsNotReturned.incrementAndGet();
                        this.requestQueue.add(new FailingWriteRequest(this, memorySegment));
                    }
                }
            };
            for (int i3 = 0; i3 < i; i3++) {
                try {
                    asynchronousBlockWriterWithCallback.writeBlock(allocateUnpooledSegment);
                } catch (IOException e) {
                    try {
                        asynchronousBlockWriterWithCallback.closeAndDelete();
                    } catch (Throwable th) {
                    }
                } catch (Throwable th2) {
                    try {
                        asynchronousBlockWriterWithCallback.closeAndDelete();
                    } catch (Throwable th3) {
                    }
                    throw th2;
                }
            }
            asynchronousBlockWriterWithCallback.close();
            Assert.fail("did not forward exception");
            try {
                asynchronousBlockWriterWithCallback.closeAndDelete();
            } catch (Throwable th4) {
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }
}
