/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.core.fs;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.SafetyNetCloseableRegistry;
import org.apache.flink.util.AbstractAutoCloseableRegistry;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public abstract class AbstractAutoCloseableRegistryTest<C extends Closeable, E extends C, T> {
    private static final int TEST_TIMEOUT_SECONDS = 10;
    protected ProducerThread[] streamOpenThreads;
    protected AbstractAutoCloseableRegistry<C, E, T, IOException> closeableRegistry;
    protected AtomicInteger unclosedCounter;

    protected abstract void registerCloseable(Closeable var1) throws IOException;

    protected abstract AbstractAutoCloseableRegistry<C, E, T, IOException> createRegistry();

    protected abstract ProducerThread<C, E, T> createProducerThread(AbstractAutoCloseableRegistry<C, E, T, IOException> var1, AtomicInteger var2, int var3);

    public void setup(int maxStreams) {
        Assertions.assertThat((boolean)SafetyNetCloseableRegistry.isReaperThreadRunning()).isFalse();
        this.closeableRegistry = this.createRegistry();
        this.unclosedCounter = new AtomicInteger(0);
        this.streamOpenThreads = new ProducerThread[10];
        for (int i = 0; i < this.streamOpenThreads.length; ++i) {
            this.streamOpenThreads[i] = this.createProducerThread(this.closeableRegistry, this.unclosedCounter, maxStreams);
        }
    }

    protected void startThreads() {
        for (ProducerThread t : this.streamOpenThreads) {
            t.start();
        }
    }

    protected void joinThreads() throws InterruptedException {
        for (ProducerThread t : this.streamOpenThreads) {
            t.join();
        }
    }

    @Test
    void testClose() throws Exception {
        this.setup(Integer.MAX_VALUE);
        this.startThreads();
        for (int i = 0; i < 5; ++i) {
            System.gc();
            Thread.sleep(40L);
        }
        this.closeableRegistry.close();
        this.joinThreads();
        Assertions.assertThat((AtomicInteger)this.unclosedCounter).hasValue(0);
        Assertions.assertThat((int)this.closeableRegistry.getNumberOfRegisteredCloseables()).isZero();
        TestCloseable testCloseable = new TestCloseable();
        Assertions.assertThatThrownBy(() -> this.registerCloseable(testCloseable)).isInstanceOf(IOException.class);
        Assertions.assertThat((boolean)testCloseable.isClosed()).isTrue();
        Assertions.assertThat((AtomicInteger)this.unclosedCounter).hasValue(0);
        Assertions.assertThat((int)this.closeableRegistry.getNumberOfRegisteredCloseables()).isZero();
    }

    @Test
    void testNonBlockingClose() throws Exception {
        this.setup(Integer.MAX_VALUE);
        BlockingTestCloseable blockingCloseable = new BlockingTestCloseable();
        this.registerCloseable(blockingCloseable);
        Assertions.assertThat((int)this.closeableRegistry.getNumberOfRegisteredCloseables()).isOne();
        Thread closer = new Thread(() -> {
            try {
                this.closeableRegistry.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        });
        closer.start();
        blockingCloseable.awaitClose(10L, TimeUnit.SECONDS);
        TestCloseable testCloseable = new TestCloseable();
        Assertions.assertThatThrownBy(() -> this.registerCloseable(testCloseable)).isInstanceOf(IOException.class);
        blockingCloseable.unblockClose();
        closer.join();
        Assertions.assertThat((boolean)testCloseable.isClosed()).isTrue();
        Assertions.assertThat((int)this.closeableRegistry.getNumberOfRegisteredCloseables()).isZero();
    }

    private static class TestCloseable
    implements Closeable {
        private final AtomicBoolean closed = new AtomicBoolean();

        private TestCloseable() {
        }

        @Override
        public void close() throws IOException {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.closed.compareAndSet(false, true)).as("TestCloseable was already closed", new Object[0])).isTrue();
        }

        public boolean isClosed() {
            return this.closed.get();
        }
    }

    private static class BlockingTestCloseable
    implements Closeable {
        private final CountDownLatch closeCalledLatch = new CountDownLatch(1);
        private final CountDownLatch blockCloseLatch = new CountDownLatch(1);

        private BlockingTestCloseable() {
        }

        @Override
        public void close() throws IOException {
            this.closeCalledLatch.countDown();
            try {
                this.blockCloseLatch.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public void unblockClose() {
            this.blockCloseLatch.countDown();
        }

        public void awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException {
            Assertions.assertThat((boolean)this.closeCalledLatch.await(timeout, timeUnit)).isTrue();
        }
    }

    protected static final class TestStream
    extends FSDataInputStream {
        private final AtomicInteger refCount;

        public TestStream(AtomicInteger refCount) {
            this.refCount = refCount;
            refCount.incrementAndGet();
        }

        public void seek(long desired) throws IOException {
        }

        public long getPos() throws IOException {
            return 0L;
        }

        public int read() throws IOException {
            return 0;
        }

        public synchronized void close() throws IOException {
            this.refCount.decrementAndGet();
        }
    }

    protected static abstract class ProducerThread<C extends Closeable, E extends C, T>
    extends Thread {
        protected final AbstractAutoCloseableRegistry<C, E, T, IOException> registry;
        protected final AtomicInteger refCount;
        protected final int maxStreams;
        protected int numStreams;

        public ProducerThread(AbstractAutoCloseableRegistry<C, E, T, IOException> registry, AtomicInteger refCount, int maxStreams) {
            this.registry = registry;
            this.refCount = refCount;
            this.maxStreams = maxStreams;
            this.numStreams = 0;
        }

        protected abstract void createAndRegisterStream() throws IOException;

        @Override
        public void run() {
            try {
                while (this.numStreams < this.maxStreams) {
                    this.createAndRegisterStream();
                    try {
                        Thread.sleep(0L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    if (this.maxStreams == Integer.MAX_VALUE) continue;
                    ++this.numStreams;
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }
}

