package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.state.ChannelPersistenceITCase;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.class */
public class ChannelStateWriterImplTest {
    private static final long CHECKPOINT_ID = 42;
    private static final String TASK_NAME = "test";

    @Test(expected = IllegalArgumentException.class)
    public void testAddEventBuffer() throws Exception {
        NetworkBuffer buffer = getBuffer();
        NetworkBuffer buffer2 = getBuffer();
        buffer2.setDataType(Buffer.DataType.EVENT_BUFFER);
        try {
            runWithSyncWorker(channelStateWriter -> {
                callStart(channelStateWriter);
                channelStateWriter.addInputData(42L, new InputChannelInfo(1, 1), 1, CloseableIterator.ofElements((v0) -> {
                    v0.recycleBuffer();
                }, new Buffer[]{buffer2, buffer}));
            });
        } finally {
            Assert.assertTrue(buffer.isRecycled());
        }
    }

    @Test
    public void testResultCompletion() throws IOException {
        ChannelStateWriterImpl openWriter = openWriter();
        Throwable th = null;
        try {
            callStart(openWriter);
            ChannelStateWriter.ChannelStateWriteResult andRemoveWriteResult = openWriter.getAndRemoveWriteResult(42L);
            Assert.assertFalse(andRemoveWriteResult.resultSubpartitionStateHandles.isDone());
            Assert.assertFalse(andRemoveWriteResult.inputChannelStateHandles.isDone());
            if (openWriter != null) {
                if (0 != 0) {
                    try {
                        openWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    openWriter.close();
                }
            }
            Assert.assertTrue(andRemoveWriteResult.inputChannelStateHandles.isDone());
            Assert.assertTrue(andRemoveWriteResult.resultSubpartitionStateHandles.isDone());
        } catch (Throwable th3) {
            if (openWriter != null) {
                if (0 != 0) {
                    try {
                        openWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    openWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAbort() throws Exception {
        NetworkBuffer buffer = getBuffer();
        runWithSyncWorker((channelStateWriter, syncChannelStateWriteRequestExecutor) -> {
            callStart(channelStateWriter);
            ChannelStateWriter.ChannelStateWriteResult andRemoveWriteResult = channelStateWriter.getAndRemoveWriteResult(42L);
            callAddInputData(channelStateWriter, buffer);
            callAbort(channelStateWriter);
            syncChannelStateWriteRequestExecutor.processAllRequests();
            Assert.assertTrue(andRemoveWriteResult.isDone());
            Assert.assertTrue(buffer.isRecycled());
        });
    }

    @Test(expected = IllegalArgumentException.class)
    public void testAbortClearsResults() throws Exception {
        getBuffer();
        runWithSyncWorker((channelStateWriter, syncChannelStateWriteRequestExecutor) -> {
            callStart(channelStateWriter);
            channelStateWriter.abort(42L, new TestException(), true);
            channelStateWriter.getAndRemoveWriteResult(42L);
        });
    }

    @Test
    public void testAbortDoesNotClearsResults() throws Exception {
        runWithSyncWorker((channelStateWriter, syncChannelStateWriteRequestExecutor) -> {
            callStart(channelStateWriter);
            callAbort(channelStateWriter);
            syncChannelStateWriteRequestExecutor.processAllRequests();
            channelStateWriter.getAndRemoveWriteResult(42L);
        });
    }

    @Test
    public void testAbortIgnoresMissing() throws Exception {
        runWithSyncWorker(this::callAbort);
    }

    @Test(expected = TestException.class)
    public void testBuffersRecycledOnError() throws Exception {
        unwrappingError(TestException.class, () -> {
            NetworkBuffer buffer = getBuffer();
            try {
                ChannelStateWriterImpl channelStateWriterImpl = new ChannelStateWriterImpl("test", new ConcurrentHashMap(), failingWorker(), 5);
                Throwable th = null;
                try {
                    try {
                        channelStateWriterImpl.open();
                        callAddInputData(channelStateWriterImpl, buffer);
                        if (channelStateWriterImpl != null) {
                            if (0 != 0) {
                                try {
                                    channelStateWriterImpl.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                channelStateWriterImpl.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
                Assert.assertTrue(buffer.isRecycled());
            }
        });
    }

    @Test
    public void testBuffersRecycledOnClose() throws Exception {
        NetworkBuffer buffer = getBuffer();
        runWithSyncWorker(channelStateWriter -> {
            callStart(channelStateWriter);
            callAddInputData(channelStateWriter, buffer);
            Assert.assertFalse(buffer.isRecycled());
        });
        Assert.assertTrue(buffer.isRecycled());
    }

    @Test(expected = IllegalArgumentException.class)
    public void testNoAddDataAfterFinished() throws Exception {
        unwrappingError(IllegalArgumentException.class, () -> {
            runWithSyncWorker(channelStateWriter -> {
                callStart(channelStateWriter);
                callFinish(channelStateWriter);
                callAddInputData(channelStateWriter, new NetworkBuffer[0]);
            });
        });
    }

    @Test(expected = IllegalArgumentException.class)
    public void testAddDataNotStarted() throws Exception {
        unwrappingError(IllegalArgumentException.class, () -> {
            runWithSyncWorker(channelStateWriter -> {
                callAddInputData(channelStateWriter, new NetworkBuffer[0]);
            });
        });
    }

    @Test(expected = IllegalArgumentException.class)
    public void testFinishNotStarted() throws Exception {
        unwrappingError(IllegalArgumentException.class, () -> {
            runWithSyncWorker(this::callFinish);
        });
    }

    @Test(expected = IllegalArgumentException.class)
    public void testRethrowOnClose() throws Exception {
        unwrappingError(IllegalArgumentException.class, () -> {
            runWithSyncWorker(channelStateWriter -> {
                try {
                    callFinish(channelStateWriter);
                } catch (IllegalArgumentException e) {
                }
            });
        });
    }

    @Test(expected = TestException.class)
    public void testRethrowOnNextCall() throws Exception {
        SyncChannelStateWriteRequestExecutor syncChannelStateWriteRequestExecutor = new SyncChannelStateWriteRequestExecutor();
        ChannelStateWriterImpl channelStateWriterImpl = new ChannelStateWriterImpl("test", new ConcurrentHashMap(), syncChannelStateWriteRequestExecutor, 5);
        channelStateWriterImpl.open();
        syncChannelStateWriteRequestExecutor.setThrown(new TestException());
        unwrappingError(TestException.class, () -> {
            callStart(channelStateWriterImpl);
        });
    }

    @Test(expected = IllegalStateException.class)
    public void testLimit() throws IOException {
        ChannelStateWriterImpl channelStateWriterImpl = new ChannelStateWriterImpl("test", 0, ChannelPersistenceITCase.getStreamFactoryFactory(), 3);
        Throwable th = null;
        try {
            try {
                channelStateWriterImpl.open();
                for (int i = 0; i < 3; i++) {
                    channelStateWriterImpl.start(i, CheckpointOptions.forCheckpointWithDefaultLocation());
                }
                channelStateWriterImpl.start(3, CheckpointOptions.forCheckpointWithDefaultLocation());
                if (channelStateWriterImpl != null) {
                    if (0 == 0) {
                        channelStateWriterImpl.close();
                        return;
                    }
                    try {
                        channelStateWriterImpl.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (channelStateWriterImpl != null) {
                if (th != null) {
                    try {
                        channelStateWriterImpl.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    channelStateWriterImpl.close();
                }
            }
            throw th4;
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testStartNotOpened() throws Exception {
        unwrappingError(IllegalStateException.class, () -> {
            ChannelStateWriterImpl channelStateWriterImpl = new ChannelStateWriterImpl("test", 0, ChannelPersistenceITCase.getStreamFactoryFactory());
            Throwable th = null;
            try {
                callStart(channelStateWriterImpl);
                if (channelStateWriterImpl != null) {
                    if (0 == 0) {
                        channelStateWriterImpl.close();
                        return;
                    }
                    try {
                        channelStateWriterImpl.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (channelStateWriterImpl != null) {
                    if (0 != 0) {
                        try {
                            channelStateWriterImpl.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        channelStateWriterImpl.close();
                    }
                }
                throw th3;
            }
        });
    }

    @Test(expected = IllegalStateException.class)
    public void testNoStartAfterClose() throws Exception {
        unwrappingError(IllegalStateException.class, () -> {
            ChannelStateWriterImpl openWriter = openWriter();
            openWriter.close();
            openWriter.start(42L, CheckpointOptions.forCheckpointWithDefaultLocation());
        });
    }

    @Test(expected = IllegalStateException.class)
    public void testNoAddDataAfterClose() throws Exception {
        unwrappingError(IllegalStateException.class, () -> {
            ChannelStateWriterImpl openWriter = openWriter();
            callStart(openWriter);
            openWriter.close();
            callAddInputData(openWriter, new NetworkBuffer[0]);
        });
    }

    private static <T extends Throwable> void unwrappingError(Class<T> cls, RunnableWithException runnableWithException) throws Exception {
        try {
            runnableWithException.run();
        } catch (Exception e) {
            throw ((Exception) ExceptionUtils.findThrowable(e, cls).map(th -> {
                return (Exception) th;
            }).orElse(e));
        }
    }

    private NetworkBuffer getBuffer() {
        return new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(123, (Object) null), FreeingBufferRecycler.INSTANCE);
    }

    private ChannelStateWriteRequestExecutor failingWorker() {
        return new ChannelStateWriteRequestExecutor() { // from class: org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImplTest.1
            public void close() {
            }

            public void submit(ChannelStateWriteRequest channelStateWriteRequest) {
                throw new TestException();
            }

            public void submitPriority(ChannelStateWriteRequest channelStateWriteRequest) {
                throw new TestException();
            }

            public void start() throws IllegalStateException {
            }
        };
    }

    private void runWithSyncWorker(Consumer<ChannelStateWriter> consumer) throws Exception {
        runWithSyncWorker((channelStateWriter, syncChannelStateWriteRequestExecutor) -> {
            consumer.accept(channelStateWriter);
        });
    }

    private void runWithSyncWorker(BiConsumerWithException<ChannelStateWriter, SyncChannelStateWriteRequestExecutor, Exception> biConsumerWithException) throws Exception {
        SyncChannelStateWriteRequestExecutor syncChannelStateWriteRequestExecutor = new SyncChannelStateWriteRequestExecutor();
        Throwable th = null;
        try {
            ChannelStateWriterImpl channelStateWriterImpl = new ChannelStateWriterImpl("test", new ConcurrentHashMap(), syncChannelStateWriteRequestExecutor, 5);
            Throwable th2 = null;
            try {
                channelStateWriterImpl.open();
                biConsumerWithException.accept(channelStateWriterImpl, syncChannelStateWriteRequestExecutor);
                syncChannelStateWriteRequestExecutor.processAllRequests();
                if (channelStateWriterImpl != null) {
                    if (0 != 0) {
                        try {
                            channelStateWriterImpl.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        channelStateWriterImpl.close();
                    }
                }
                if (syncChannelStateWriteRequestExecutor != null) {
                    if (0 == 0) {
                        syncChannelStateWriteRequestExecutor.close();
                        return;
                    }
                    try {
                        syncChannelStateWriteRequestExecutor.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            } catch (Throwable th5) {
                if (channelStateWriterImpl != null) {
                    if (0 != 0) {
                        try {
                            channelStateWriterImpl.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        channelStateWriterImpl.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (syncChannelStateWriteRequestExecutor != null) {
                if (0 != 0) {
                    try {
                        syncChannelStateWriteRequestExecutor.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    syncChannelStateWriteRequestExecutor.close();
                }
            }
            throw th7;
        }
    }

    private ChannelStateWriterImpl openWriter() {
        ChannelStateWriterImpl channelStateWriterImpl = new ChannelStateWriterImpl("test", 0, ChannelPersistenceITCase.getStreamFactoryFactory());
        channelStateWriterImpl.open();
        return channelStateWriterImpl;
    }

    private void callStart(ChannelStateWriter channelStateWriter) {
        channelStateWriter.start(42L, CheckpointOptions.forCheckpointWithDefaultLocation());
    }

    private void callAddInputData(ChannelStateWriter channelStateWriter, NetworkBuffer... networkBufferArr) {
        channelStateWriter.addInputData(42L, new InputChannelInfo(1, 1), 1, CloseableIterator.ofElements((v0) -> {
            v0.recycleBuffer();
        }, networkBufferArr));
    }

    private void callAbort(ChannelStateWriter channelStateWriter) {
        channelStateWriter.abort(42L, new TestException(), false);
    }

    private void callFinish(ChannelStateWriter channelStateWriter) {
        channelStateWriter.finishInput(42L);
        channelStateWriter.finishOutput(42L);
    }
}
