package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.types.IntValue;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({ResultPartitionWriter.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.class */
public class RecordWriterTest {
    @Test
    public void testClearBuffersAfterInterruptDuringBlockingBufferRequest() throws Exception {
        ExecutorService executorService = null;
        try {
            executorService = Executors.newSingleThreadExecutor();
            final CountDownLatch countDownLatch = new CountDownLatch(2);
            final Buffer buffer = (Buffer) Mockito.spy(TestBufferFactory.createBuffer(4));
            Answer<Buffer> answer = new Answer<Buffer>() { // from class: org.apache.flink.runtime.io.network.api.writer.RecordWriterTest.1
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Buffer m45answer(InvocationOnMock invocationOnMock) throws Throwable {
                    countDownLatch.countDown();
                    if (countDownLatch.getCount() == 1) {
                        return buffer;
                    }
                    Object obj = new Object();
                    synchronized (obj) {
                        while (true) {
                            obj.wait();
                        }
                    }
                }
            };
            BufferProvider bufferProvider = (BufferProvider) Mockito.mock(BufferProvider.class);
            Mockito.when(bufferProvider.requestBufferBlocking()).thenAnswer(answer);
            ResultPartitionWriter createResultPartitionWriter = createResultPartitionWriter(bufferProvider);
            final RecordWriter recordWriter = new RecordWriter(createResultPartitionWriter);
            Future submit = executorService.submit(new Callable<Void>() { // from class: org.apache.flink.runtime.io.network.api.writer.RecordWriterTest.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    IntValue intValue = new IntValue(0);
                    try {
                        recordWriter.emit(intValue);
                        recordWriter.flush();
                        recordWriter.emit(intValue);
                        return null;
                    } catch (InterruptedException e) {
                        recordWriter.clearBuffers();
                        return null;
                    }
                }
            });
            countDownLatch.await();
            submit.cancel(true);
            recordWriter.clearBuffers();
            ((BufferProvider) Mockito.verify(bufferProvider, Mockito.times(2))).requestBufferBlocking();
            ((ResultPartitionWriter) Mockito.verify(createResultPartitionWriter, Mockito.times(1))).writeBuffer((Buffer) Matchers.any(Buffer.class), Matchers.anyInt());
            Assert.assertTrue("Buffer not recycled.", buffer.isRecycled());
            ((Buffer) Mockito.verify(buffer, Mockito.times(1))).recycle();
            if (executorService != null) {
                executorService.shutdown();
            }
        } catch (Throwable th) {
            if (executorService != null) {
                executorService.shutdown();
            }
            throw th;
        }
    }

    @Test
    public void testClearBuffersAfterExceptionInPartitionWriter() throws Exception {
        NetworkBufferPool networkBufferPool = null;
        BufferPool bufferPool = null;
        try {
            networkBufferPool = new NetworkBufferPool(1, 1024, MemoryType.HEAP);
            bufferPool = (BufferPool) Mockito.spy(networkBufferPool.createBufferPool(1, true));
            ResultPartitionWriter resultPartitionWriter = (ResultPartitionWriter) Mockito.mock(ResultPartitionWriter.class);
            Mockito.when(resultPartitionWriter.getBufferProvider()).thenReturn(Preconditions.checkNotNull(bufferPool));
            Mockito.when(Integer.valueOf(resultPartitionWriter.getNumberOfOutputChannels())).thenReturn(1);
            ((ResultPartitionWriter) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.io.network.api.writer.RecordWriterTest.3
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m46answer(InvocationOnMock invocationOnMock) throws Throwable {
                    ((Buffer) invocationOnMock.getArguments()[0]).recycle();
                    throw new RuntimeException("Expected test Exception");
                }
            }).when(resultPartitionWriter)).writeBuffer((Buffer) Matchers.any(Buffer.class), Matchers.anyInt());
            RecordWriter recordWriter = new RecordWriter(resultPartitionWriter);
            while (true) {
                try {
                    recordWriter.emit(new IntValue(0));
                } catch (Exception e) {
                    recordWriter.clearBuffers();
                    ((ResultPartitionWriter) Mockito.verify(resultPartitionWriter, Mockito.times(1))).writeBuffer((Buffer) Matchers.any(Buffer.class), Matchers.anyInt());
                    ((BufferPool) Mockito.verify(bufferPool, Mockito.times(1))).requestBufferBlocking();
                    try {
                        recordWriter.emit(new IntValue(0));
                        recordWriter.flush();
                        Assert.fail("Did not throw expected test Exception");
                    } catch (Exception e2) {
                        recordWriter.clearBuffers();
                    }
                    ((ResultPartitionWriter) Mockito.verify(resultPartitionWriter, Mockito.times(2))).writeBuffer((Buffer) Matchers.any(Buffer.class), Matchers.anyInt());
                    ((BufferPool) Mockito.verify(bufferPool, Mockito.times(2))).requestBufferBlocking();
                    while (true) {
                        try {
                            recordWriter.broadcastEmit(new IntValue(0));
                        } catch (Exception e3) {
                            recordWriter.clearBuffers();
                            ((ResultPartitionWriter) Mockito.verify(resultPartitionWriter, Mockito.times(3))).writeBuffer((Buffer) Matchers.any(Buffer.class), Matchers.anyInt());
                            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(3))).requestBufferBlocking();
                            try {
                                recordWriter.emit(new IntValue(0));
                                recordWriter.sendEndOfSuperstep();
                                Assert.fail("Did not throw expected test Exception");
                            } catch (Exception e4) {
                                recordWriter.clearBuffers();
                            }
                            ((ResultPartitionWriter) Mockito.verify(resultPartitionWriter, Mockito.times(4))).writeBuffer((Buffer) Matchers.any(Buffer.class), Matchers.anyInt());
                            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(4))).requestBufferBlocking();
                            try {
                                recordWriter.emit(new IntValue(0));
                                recordWriter.broadcastEvent(new TestTaskEvent());
                                Assert.fail("Did not throw expected test Exception");
                            } catch (Exception e5) {
                                recordWriter.clearBuffers();
                            }
                            ((ResultPartitionWriter) Mockito.verify(resultPartitionWriter, Mockito.times(5))).writeBuffer((Buffer) Matchers.any(Buffer.class), Matchers.anyInt());
                            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(5))).requestBufferBlocking();
                            if (bufferPool != null) {
                                Assert.assertEquals(1L, bufferPool.getNumberOfAvailableMemorySegments());
                                bufferPool.lazyDestroy();
                            }
                            if (networkBufferPool != null) {
                                Assert.assertEquals(1L, networkBufferPool.getNumberOfAvailableMemorySegments());
                                networkBufferPool.destroy();
                                return;
                            }
                            return;
                        }
                    }
                }
            }
        } catch (Throwable th) {
            if (bufferPool != null) {
                Assert.assertEquals(1L, bufferPool.getNumberOfAvailableMemorySegments());
                bufferPool.lazyDestroy();
            }
            if (networkBufferPool != null) {
                Assert.assertEquals(1L, networkBufferPool.getNumberOfAvailableMemorySegments());
                networkBufferPool.destroy();
            }
            throw th;
        }
    }

    @Test
    public void testSerializerClearedAfterClearBuffers() throws Exception {
        ResultPartitionWriter createResultPartitionWriter = createResultPartitionWriter(createBufferProvider(TestBufferFactory.createBuffer(16)));
        RecordWriter recordWriter = new RecordWriter(createResultPartitionWriter);
        recordWriter.emit(new IntValue(0));
        ((ResultPartitionWriter) Mockito.verify(createResultPartitionWriter, Mockito.never())).writeBuffer((Buffer) Matchers.any(Buffer.class), Matchers.anyInt());
        recordWriter.clearBuffers();
        recordWriter.flush();
    }

    private BufferProvider createBufferProvider(Buffer... bufferArr) throws IOException, InterruptedException {
        BufferProvider bufferProvider = (BufferProvider) Mockito.mock(BufferProvider.class);
        for (Buffer buffer : bufferArr) {
            Mockito.when(bufferProvider.requestBufferBlocking()).thenReturn(buffer);
        }
        return bufferProvider;
    }

    private ResultPartitionWriter createResultPartitionWriter(BufferProvider bufferProvider) throws IOException {
        ResultPartitionWriter resultPartitionWriter = (ResultPartitionWriter) Mockito.mock(ResultPartitionWriter.class);
        Mockito.when(resultPartitionWriter.getBufferProvider()).thenReturn(Preconditions.checkNotNull(bufferProvider));
        Mockito.when(Integer.valueOf(resultPartitionWriter.getNumberOfOutputChannels())).thenReturn(1);
        ((ResultPartitionWriter) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.io.network.api.writer.RecordWriterTest.4
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m47answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((Buffer) invocationOnMock.getArguments()[0]).recycle();
                return null;
            }
        }).when(resultPartitionWriter)).writeBuffer((Buffer) Matchers.any(Buffer.class), Matchers.anyInt());
        return resultPartitionWriter;
    }
}
