/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
import org.apache.flink.types.LongValue;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={ResultPartitionWriter.class})
public class StreamRecordWriterTest {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPropagateAsyncFlushError() {
        try (FailingWriter testWriter = null;){
            ResultPartitionWriter mockResultPartitionWriter = StreamRecordWriterTest.getMockWriter(5);
            testWriter = new FailingWriter(mockResultPartitionWriter, (ChannelSelector)new RoundRobinChannelSelector(), 5L, 3);
            try {
                long deadline = System.currentTimeMillis() + 20000L;
                long l = 0L;
                while (System.currentTimeMillis() < deadline) {
                    testWriter.emit((IOReadableWritable)new LongValue(l++));
                }
                Assert.fail((String)"This should have failed with an exception");
            }
            catch (IOException e) {
                Assert.assertNotNull((Object)e.getCause());
                Assert.assertTrue((boolean)e.getCause().getMessage().contains("Test Exception"));
            }
        }
    }

    private static ResultPartitionWriter getMockWriter(int numPartitions) throws Exception {
        BufferProvider mockProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)mockProvider.requestBufferBlocking()).thenAnswer((Answer)new Answer<Buffer>(){

            public Buffer answer(InvocationOnMock invocation) {
                return new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)4096), FreeingBufferRecycler.INSTANCE);
            }
        });
        ResultPartitionWriter mockWriter = (ResultPartitionWriter)Mockito.mock(ResultPartitionWriter.class);
        Mockito.when((Object)mockWriter.getBufferProvider()).thenReturn((Object)mockProvider);
        Mockito.when((Object)mockWriter.getNumberOfOutputChannels()).thenReturn((Object)numPartitions);
        return mockWriter;
    }

    private static class FailingWriter<T extends IOReadableWritable>
    extends StreamRecordWriter<T> {
        private int flushesBeforeException;

        private FailingWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, long timeout, int flushesBeforeException) {
            super(writer, channelSelector, timeout);
            this.flushesBeforeException = flushesBeforeException;
        }

        public void flush() throws IOException {
            if (this.flushesBeforeException-- <= 0) {
                throw new IOException("Test Exception");
            }
            super.flush();
        }
    }
}

