package org.apache.hadoop.util.functional;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.configuration2.tree.DefaultExpressionEngineSymbols;
import org.apache.hadoop.test.HadoopTestBase;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.functional.TaskPool;
import org.jline.reader.LineReader;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:WEB-INF/lib/hadoop-common-3.3.5-tests.jar:org/apache/hadoop/util/functional/TestTaskPool.class */
public class TestTaskPool extends HadoopTestBase {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TestTaskPool.class);
    public static final int ITEM_COUNT = 16;
    private static final int FAILPOINT = 8;
    private final int numThreads;
    private ExecutorService threadPool;
    private TaskPool.Submitter submitter;
    private final CounterTask failingTask = new CounterTask("failing committer", 8, (v0) -> {
        return v0.commit();
    });
    private final FailureCounter failures = new FailureCounter("failures", 0, null);
    private final CounterTask reverter = new CounterTask("reverter", 0, (v0) -> {
        return v0.revert();
    });
    private final CounterTask aborter = new CounterTask("aborter", 0, (v0) -> {
        return v0.abort();
    });
    private List<Item> items;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-common-3.3.5-tests.jar:org/apache/hadoop/util/functional/TestTaskPool$BaseCounter.class */
    public class BaseCounter {
        private final AtomicInteger counter = new AtomicInteger(0);
        private final int limit;
        private final String name;
        private Item item;
        private final Optional<Function<Item, Boolean>> action;

        BaseCounter(String str, int i, Function<Item, Boolean> function) {
            this.name = str;
            this.limit = i;
            this.action = Optional.ofNullable(function);
        }

        void process(Item item) throws IOException {
            this.item = item;
            if (this.limit == this.counter.incrementAndGet()) {
                item.fail();
                TestTaskPool.LOG.info("{}: Failed {}", this, item);
                throw new IOException(String.format("%s: Limit %d reached for %s", this, Integer.valueOf(this.limit), item));
            }
            String item2 = item.toString();
            this.action.map(function -> {
                return (Boolean) function.apply(item);
            });
            TestTaskPool.LOG.info("{}: {} -> {}", this, item2, item);
        }

        int getCount() {
            return this.counter.get();
        }

        Item getItem() {
            return this.item;
        }

        void assertInvoked(String str, int i) {
            Assert.assertEquals(toString() + ": " + str, i, getCount());
        }

        void assertInvokedAtLeast(String str, int i) {
            int count = getCount();
            Assert.assertTrue(toString() + ": " + str + "-expected " + i + " invocations, but got " + count + " in " + TestTaskPool.this.itemsToString(), i <= count);
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("BaseCounter{");
            sb.append("name='").append(this.name).append('\'');
            sb.append(", count=").append(this.counter.get());
            sb.append(", limit=").append(this.limit);
            sb.append(", item=").append(this.item);
            sb.append('}');
            return sb.toString();
        }
    }

    /* loaded from: input_file:WEB-INF/lib/hadoop-common-3.3.5-tests.jar:org/apache/hadoop/util/functional/TestTaskPool$CounterTask.class */
    private final class CounterTask extends BaseCounter implements TaskPool.Task<Item, IOException> {
        private CounterTask(String str, int i, Function<Item, Boolean> function) {
            super(str, i, function);
        }

        @Override // org.apache.hadoop.util.functional.TaskPool.Task
        public void run(Item item) throws IOException {
            process(item);
        }
    }

    /* loaded from: input_file:WEB-INF/lib/hadoop-common-3.3.5-tests.jar:org/apache/hadoop/util/functional/TestTaskPool$FailureCounter.class */
    private final class FailureCounter extends BaseCounter implements TaskPool.FailureTask<Item, IOException> {
        private Exception exception;

        private FailureCounter(String str, int i, Function<Item, Boolean> function) {
            super(str, i, function);
        }

        @Override // org.apache.hadoop.util.functional.TaskPool.FailureTask
        public void run(Item item, Exception exc) throws IOException {
            process(item);
            this.exception = exc;
        }

        private Exception getException() {
            return this.exception;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-common-3.3.5-tests.jar:org/apache/hadoop/util/functional/TestTaskPool$Item.class */
    public final class Item {
        private final int id;
        private final String text;
        private volatile boolean committed;
        private volatile boolean aborted;
        private volatile boolean reverted;
        private volatile boolean failed;

        private Item(int i, String str) {
            this.id = i;
            this.text = str;
        }

        boolean commit() {
            this.committed = true;
            return true;
        }

        boolean abort() {
            this.aborted = true;
            return true;
        }

        boolean revert() {
            this.reverted = true;
            return true;
        }

        boolean fail() {
            this.failed = true;
            return true;
        }

        public Item assertCommitted() {
            Assert.assertTrue(toString() + " was not committed in\n" + TestTaskPool.this.itemsToString(), this.committed);
            return this;
        }

        public Item assertCommittedOrFailed() {
            Assert.assertTrue(toString() + " was not committed nor failed in\n" + TestTaskPool.this.itemsToString(), this.committed || this.failed);
            return this;
        }

        public Item assertAborted() {
            Assert.assertTrue(toString() + " was not aborted in\n" + TestTaskPool.this.itemsToString(), this.aborted);
            return this;
        }

        public Item assertReverted() {
            Assert.assertTrue(toString() + " was not reverted in\n" + TestTaskPool.this.itemsToString(), this.reverted);
            return this;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("Item{");
            sb.append(String.format("[%02d]", Integer.valueOf(this.id)));
            sb.append(", committed=").append(this.committed);
            sb.append(", aborted=").append(this.aborted);
            sb.append(", reverted=").append(this.reverted);
            sb.append(", failed=").append(this.failed);
            sb.append(", text=").append(this.text);
            sb.append('}');
            return sb.toString();
        }
    }

    /* loaded from: input_file:WEB-INF/lib/hadoop-common-3.3.5-tests.jar:org/apache/hadoop/util/functional/TestTaskPool$PoolSubmitter.class */
    private class PoolSubmitter implements TaskPool.Submitter {
        private PoolSubmitter() {
        }

        @Override // org.apache.hadoop.util.functional.TaskPool.Submitter
        public Future<?> submit(Runnable runnable) {
            return TestTaskPool.this.threadPool.submit(runnable);
        }
    }

    @Parameterized.Parameters(name = "threads={0}")
    public static Collection<Object[]> params() {
        return Arrays.asList(new Object[]{0}, new Object[]{1}, new Object[]{3}, new Object[]{8}, new Object[]{16});
    }

    public TestTaskPool(int i) {
        this.numThreads = i;
    }

    public boolean isParallel() {
        return this.numThreads > 1;
    }

    @Before
    public void setup() {
        this.items = (List) IntStream.rangeClosed(1, 16).mapToObj(i -> {
            return new Item(i, String.format("With %d threads", Integer.valueOf(this.numThreads)));
        }).collect(Collectors.toList());
        if (this.numThreads <= 0) {
            this.submitter = null;
        } else {
            this.threadPool = Executors.newFixedThreadPool(this.numThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(getMethodName() + "-pool-%d").build());
            this.submitter = new PoolSubmitter();
        }
    }

    @After
    public void teardown() {
        if (this.threadPool != null) {
            this.threadPool.shutdown();
            this.threadPool = null;
        }
    }

    private TaskPool.Builder<Item> builder() {
        return TaskPool.foreach(this.items).executeWith(this.submitter);
    }

    private void assertRun(TaskPool.Builder<Item> builder, CounterTask counterTask) throws IOException {
        assertTrue("Run of " + counterTask + " failed", builder.run(counterTask));
    }

    private void assertFailed(TaskPool.Builder<Item> builder, CounterTask counterTask) throws IOException {
        assertFalse("Run of " + counterTask + " unexpectedly succeeded", builder.run(counterTask));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String itemsToString() {
        return "[" + ((String) this.items.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining("\n"))) + DefaultExpressionEngineSymbols.DEFAULT_ATTRIBUTE_END;
    }

    @Test
    public void testSimpleInvocation() throws Throwable {
        CounterTask counterTask = new CounterTask("simple", 0, (v0) -> {
            return v0.commit();
        });
        assertRun(builder(), counterTask);
        counterTask.assertInvoked("", 16);
    }

    @Test
    public void testFailNoStoppingSuppressed() throws Throwable {
        assertFailed(builder().suppressExceptions(), this.failingTask);
        this.failingTask.assertInvoked("Continued through operations", 16);
        this.items.forEach((v0) -> {
            v0.assertCommittedOrFailed();
        });
    }

    @Test
    public void testFailFastSuppressed() throws Throwable {
        assertFailed(builder().suppressExceptions().stopOnFailure(), this.failingTask);
        if (isParallel()) {
            this.failingTask.assertInvokedAtLeast("stop fast", 8);
        } else {
            this.failingTask.assertInvoked("stop fast", 8);
        }
    }

    @Test
    public void testFailedCallAbortSuppressed() throws Throwable {
        assertFailed(builder().stopOnFailure().suppressExceptions().abortWith(this.aborter), this.failingTask);
        this.failingTask.assertInvokedAtLeast("success", 8);
        if (isParallel()) {
            return;
        }
        this.aborter.assertInvokedAtLeast(LineReader.SEND_BREAK, 1);
        this.items.stream().filter(item -> {
            return !item.committed;
        }).map((v0) -> {
            return v0.assertAborted();
        });
        this.items.stream().filter(item2 -> {
            return item2.committed;
        }).forEach(item3 -> {
            assertFalse(item3.toString(), item3.aborted);
        });
    }

    @Test
    public void testFailedCalledWhenNotStoppingSuppressed() throws Throwable {
        assertFailed(builder().suppressExceptions().onFailure(this.failures), this.failingTask);
        this.failingTask.assertInvokedAtLeast("success", 8);
        this.failures.assertInvoked("failure event", 1);
    }

    @Test
    public void testFailFastCallRevertSuppressed() throws Throwable {
        assertFailed(builder().stopOnFailure().revertWith(this.reverter).abortWith(this.aborter).suppressExceptions().onFailure(this.failures), this.failingTask);
        this.failingTask.assertInvokedAtLeast("success", 8);
        if (!isParallel()) {
            this.aborter.assertInvokedAtLeast(LineReader.SEND_BREAK, 1);
            this.items.stream().filter(item -> {
                return !item.committed;
            }).filter(item2 -> {
                return !item2.failed;
            }).forEach((v0) -> {
                v0.assertAborted();
            });
        }
        this.items.stream().filter(item3 -> {
            return item3.committed && !item3.failed;
        }).forEach((v0) -> {
            v0.assertReverted();
        });
        this.items.stream().filter(item4 -> {
            return item4.reverted;
        }).forEach((v0) -> {
            v0.assertCommitted();
        });
        this.failures.assertInvoked("failure event", 1);
    }

    @Test
    public void testFailSlowCallRevertSuppressed() throws Throwable {
        assertFailed(builder().suppressExceptions().revertWith(this.reverter).onFailure(this.failures), this.failingTask);
        this.failingTask.assertInvokedAtLeast("success", 8);
        int i = this.failures.getItem().id;
        this.items.stream().filter(item -> {
            return item.id != i;
        }).filter(item2 -> {
            return item2.committed;
        }).forEach((v0) -> {
            v0.assertReverted();
        });
        this.items.stream().filter(item3 -> {
            return item3.reverted;
        }).forEach((v0) -> {
            v0.assertCommitted();
        });
        this.failures.assertInvoked("failure event", 1);
    }

    @Test
    public void testFailFastExceptions() throws Throwable {
        LambdaTestUtils.intercept(IOException.class, () -> {
            return Boolean.valueOf(builder().stopOnFailure().run(this.failingTask));
        });
        if (isParallel()) {
            this.failingTask.assertInvokedAtLeast("stop fast", 8);
        } else {
            this.failingTask.assertInvoked("stop fast", 8);
        }
    }

    @Test
    public void testFailSlowExceptions() throws Throwable {
        LambdaTestUtils.intercept(IOException.class, () -> {
            return Boolean.valueOf(builder().run(this.failingTask));
        });
        this.failingTask.assertInvoked("continued through operations", 16);
        this.items.forEach((v0) -> {
            v0.assertCommittedOrFailed();
        });
    }

    @Test
    public void testFailFastExceptionsWithAbortFailure() throws Throwable {
        CounterTask counterTask = new CounterTask("task", 1, (v0) -> {
            return v0.commit();
        });
        CounterTask counterTask2 = new CounterTask("aborter", 1, (v0) -> {
            return v0.abort();
        });
        LambdaTestUtils.intercept(IOException.class, () -> {
            return Boolean.valueOf(builder().stopOnFailure().abortWith(counterTask2).run(counterTask));
        });
        if (isParallel()) {
            return;
        }
        counterTask2.assertInvokedAtLeast(LineReader.SEND_BREAK, 15);
    }

    @Test
    public void testFailFastExceptionsWithAbortFailureStopped() throws Throwable {
        CounterTask counterTask = new CounterTask("task", 1, (v0) -> {
            return v0.commit();
        });
        CounterTask counterTask2 = new CounterTask("aborter", 1, (v0) -> {
            return v0.abort();
        });
        LambdaTestUtils.intercept(IOException.class, () -> {
            return Boolean.valueOf(builder().stopOnFailure().stopAbortsOnFailure().abortWith(counterTask2).run(counterTask));
        });
        if (isParallel()) {
            return;
        }
        counterTask2.assertInvoked(LineReader.SEND_BREAK, 1);
    }

    @Test
    public void testRevertAllSuppressed() throws Throwable {
        CounterTask counterTask = new CounterTask("task", 16, (v0) -> {
            return v0.commit();
        });
        assertFailed(builder().suppressExceptions().stopOnFailure().revertWith(this.reverter).abortWith(this.aborter).onFailure(this.failures), counterTask);
        counterTask.assertInvoked("success", 16);
        assertEquals(16L, 1 + this.aborter.getCount() + this.reverter.getCount());
        int i = this.failures.getItem().id;
        this.items.stream().filter(item -> {
            return item.id != i;
        }).filter(item2 -> {
            return item2.committed;
        }).forEach((v0) -> {
            v0.assertReverted();
        });
        this.items.stream().filter(item3 -> {
            return item3.id != i;
        }).filter(item4 -> {
            return !item4.committed;
        }).forEach((v0) -> {
            v0.assertAborted();
        });
        this.items.stream().filter(item5 -> {
            return item5.reverted;
        }).forEach((v0) -> {
            v0.assertCommitted();
        });
        this.failures.assertInvoked("failure event", 1);
    }
}
