/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.common.io;

import java.io.IOException;
import java.time.Duration;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.api.common.io.OutputFormatBase;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class OutputFormatBaseTest {
    private static final Duration DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Duration.ofMillis(Long.MAX_VALUE);

    OutputFormatBaseTest() {
    }

    @Test
    void testSuccessfulWrite() throws Exception {
        try (TestOutputFormat testOutputFormat = OutputFormatBaseTest.createOpenedTestOutputFormat();){
            testOutputFormat.enqueueCompletableFuture(CompletableFuture.completedFuture(null));
            int originalPermits = testOutputFormat.getAvailablePermits();
            Assertions.assertThat((int)originalPermits).isPositive();
            Assertions.assertThat((int)testOutputFormat.getAcquiredPermits()).isZero();
            testOutputFormat.writeRecord("hello");
            Assertions.assertThat((int)testOutputFormat.getAvailablePermits()).isEqualTo(originalPermits);
            Assertions.assertThat((int)testOutputFormat.getAcquiredPermits()).isZero();
        }
    }

    @Test
    void testThrowErrorOnClose() throws Exception {
        TestOutputFormat testOutputFormat = OutputFormatBaseTest.createTestOutputFormat();
        testOutputFormat.open(1, 1);
        RuntimeException cause = new RuntimeException();
        testOutputFormat.enqueueCompletableFuture(FutureUtils.completedExceptionally((Throwable)cause));
        testOutputFormat.writeRecord("none");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((TestOutputFormat)testOutputFormat).close()).isInstanceOf(IOException.class)).hasCauseReference((Throwable)cause);
    }

    @Test
    void testThrowErrorOnWrite() throws Exception {
        try (TestOutputFormat testOutputFormat = OutputFormatBaseTest.createOpenedTestOutputFormat();){
            RuntimeException cause = new RuntimeException();
            testOutputFormat.enqueueCompletableFuture(FutureUtils.completedExceptionally((Throwable)cause));
            testOutputFormat.writeRecord("none");
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> testOutputFormat.writeRecord("none"), (String)"Sending of second value should have failed.", (Object[])new Object[0]).isInstanceOf(IOException.class)).hasCauseReference((Throwable)cause);
            Assertions.assertThat((int)testOutputFormat.getAcquiredPermits()).isZero();
        }
    }

    @Test
    void testWaitForPendingUpdatesOnClose() throws Exception {
        try (final TestOutputFormat testOutputFormat = OutputFormatBaseTest.createOpenedTestOutputFormat();){
            CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
            testOutputFormat.enqueueCompletableFuture(completableFuture);
            testOutputFormat.writeRecord("hello");
            Assertions.assertThat((int)testOutputFormat.getAcquiredPermits()).isOne();
            CheckedThread checkedThread = new CheckedThread("Flink-OutputFormatBaseTest"){

                public void go() throws Exception {
                    testOutputFormat.close();
                }
            };
            checkedThread.start();
            while (checkedThread.getState() != Thread.State.TIMED_WAITING) {
                Thread.sleep(5L);
            }
            Assertions.assertThat((int)testOutputFormat.getAcquiredPermits()).isOne();
            completableFuture.complete(null);
            checkedThread.sync();
            Assertions.assertThat((int)testOutputFormat.getAcquiredPermits()).isZero();
        }
    }

    @Test
    void testReleaseOnSuccess() throws Exception {
        try (TestOutputFormat openedTestOutputFormat = OutputFormatBaseTest.createOpenedTestOutputFormat();){
            Assertions.assertThat((int)openedTestOutputFormat.getAvailablePermits()).isOne();
            Assertions.assertThat((int)openedTestOutputFormat.getAcquiredPermits()).isZero();
            CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
            openedTestOutputFormat.enqueueCompletableFuture(completableFuture);
            openedTestOutputFormat.writeRecord("hello");
            Assertions.assertThat((int)openedTestOutputFormat.getAvailablePermits()).isZero();
            Assertions.assertThat((int)openedTestOutputFormat.getAcquiredPermits()).isOne();
            completableFuture.complete(null);
            Assertions.assertThat((int)openedTestOutputFormat.getAvailablePermits()).isOne();
            Assertions.assertThat((int)openedTestOutputFormat.getAcquiredPermits()).isZero();
        }
    }

    @Test
    void testReleaseOnFailure() throws Exception {
        TestOutputFormat testOutputFormat = OutputFormatBaseTest.createOpenedTestOutputFormat();
        Assertions.assertThat((int)testOutputFormat.getAvailablePermits()).isOne();
        Assertions.assertThat((int)testOutputFormat.getAcquiredPermits()).isZero();
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        testOutputFormat.enqueueCompletableFuture(completableFuture);
        testOutputFormat.writeRecord("none");
        Assertions.assertThat((int)testOutputFormat.getAvailablePermits()).isZero();
        Assertions.assertThat((int)testOutputFormat.getAcquiredPermits()).isOne();
        completableFuture.completeExceptionally(new RuntimeException());
        Assertions.assertThat((int)testOutputFormat.getAvailablePermits()).isOne();
        Assertions.assertThat((int)testOutputFormat.getAcquiredPermits()).isZero();
        Assertions.assertThatThrownBy(() -> ((TestOutputFormat)testOutputFormat).close());
    }

    @Test
    void testReleaseOnThrowingSend() throws Exception {
        Function<String, CompletionStage<Void>> failingSendFunction = ignoredRecord -> {
            throw new RuntimeException("expected");
        };
        try (TestOutputFormat testOutputFormat = OutputFormatBaseTest.createOpenedTestOutputFormat(failingSendFunction);){
            Assertions.assertThat((int)testOutputFormat.getAvailablePermits()).isOne();
            Assertions.assertThat((int)testOutputFormat.getAcquiredPermits()).isZero();
            try {
                testOutputFormat.writeRecord("none");
            }
            catch (RuntimeException runtimeException) {
                // empty catch block
            }
            Assertions.assertThat((int)testOutputFormat.getAvailablePermits()).isOne();
            Assertions.assertThat((int)testOutputFormat.getAcquiredPermits()).isZero();
        }
    }

    @Test
    void testMaxConcurrentRequestsReached() throws Exception {
        try (TestOutputFormat testOutputFormat = OutputFormatBaseTest.createOpenedTestOutputFormat(Duration.ofMillis(1L));){
            CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
            testOutputFormat.enqueueCompletableFuture(completableFuture);
            testOutputFormat.enqueueCompletableFuture(completableFuture);
            testOutputFormat.writeRecord("writeRecord #1");
            Assertions.assertThatThrownBy(() -> testOutputFormat.writeRecord("writeRecord #2"), (String)"Sending value should have experienced a TimeoutException.", (Object[])new Object[0]).hasCauseInstanceOf(TimeoutException.class);
            completableFuture.complete(null);
        }
    }

    private static TestOutputFormat createTestOutputFormat() {
        TestOutputFormat testOutputFormat = new TestOutputFormat(1, DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT);
        testOutputFormat.configure(new Configuration());
        return testOutputFormat;
    }

    private static TestOutputFormat createOpenedTestOutputFormat() {
        return OutputFormatBaseTest.createOpenedTestOutputFormat(DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT);
    }

    private static TestOutputFormat createOpenedTestOutputFormat(Duration maxConcurrentRequestsTimeout) {
        TestOutputFormat testOutputFormat = new TestOutputFormat(1, maxConcurrentRequestsTimeout);
        testOutputFormat.configure(new Configuration());
        testOutputFormat.open(1, 1);
        return testOutputFormat;
    }

    private static TestOutputFormat createOpenedTestOutputFormat(Function<String, CompletionStage<Void>> sendFunction) {
        TestOutputFormat testOutputFormat = new TestOutputFormat(1, DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT, sendFunction);
        testOutputFormat.configure(new Configuration());
        testOutputFormat.open(1, 1);
        return testOutputFormat;
    }

    private static class TestOutputFormat
    extends OutputFormatBase<String, Void>
    implements AutoCloseable {
        private static final long serialVersionUID = 6646648756749403023L;
        private final Queue<CompletionStage<Void>> tasksQueue = new LinkedList<CompletionStage<Void>>();
        @Nullable
        private final Function<String, CompletionStage<Void>> sendFunction;

        private TestOutputFormat(int maxConcurrentRequests, Duration maxConcurrentRequestsTimeout) {
            super(maxConcurrentRequests, maxConcurrentRequestsTimeout);
            this.sendFunction = null;
        }

        private TestOutputFormat(int maxConcurrentRequests, Duration maxConcurrentRequestsTimeout, Function<String, CompletionStage<Void>> sendFunction) {
            super(maxConcurrentRequests, maxConcurrentRequestsTimeout);
            this.sendFunction = sendFunction;
        }

        protected CompletionStage<Void> send(String record) {
            return this.sendFunction == null ? this.tasksQueue.poll() : this.sendFunction.apply(record);
        }

        void enqueueCompletableFuture(CompletableFuture<Void> completableFuture) {
            Preconditions.checkNotNull(completableFuture);
            this.tasksQueue.offer(completableFuture);
        }

        public void configure(Configuration parameters) {
        }
    }
}

