package org.apache.iceberg.util;

import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.HashMultiset;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMultiset;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Multiset;
import org.apache.iceberg.util.ParallelIterable;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/iceberg/util/TestParallelIterable.class */
public class TestParallelIterable {
    @Test
    public void closeParallelIteratorWithoutCompleteIteration() {
        ParallelIterable.ParallelIterator it = new ParallelIterable(Iterables.transform(Lists.newArrayList(new Integer[]{1, 2, 3, 4, 5}), num -> {
            return new CloseableIterable<Integer>() { // from class: org.apache.iceberg.util.TestParallelIterable.1
                public void close() {
                }

                /* renamed from: iterator, reason: merged with bridge method [inline-methods] */
                public CloseableIterator<Integer> m87iterator() {
                    return CloseableIterator.withClose(Collections.singletonList(num).iterator());
                }
            };
        }), Executors.newFixedThreadPool(1)).iterator();
        Assertions.assertThat(it.hasNext()).isTrue();
        Assertions.assertThat((Integer) it.next()).isNotNull();
        Awaitility.await("Queue is populated").atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
            queueHasElements(it);
        });
        it.close();
        Awaitility.await("Queue is cleared").atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(it.queueSize()).isEqualTo(0);
        });
    }

    @Test
    public void closeMoreDataParallelIteratorWithoutCompleteIteration() {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        Iterator<Integer> it = new Iterator<Integer>() { // from class: org.apache.iceberg.util.TestParallelIterable.2
            private int number = 1;

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.number > 1000) {
                    return false;
                }
                this.number++;
                return true;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public Integer next() {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                }
                return Integer.valueOf(this.number);
            }
        };
        ParallelIterable.ParallelIterator it2 = new ParallelIterable(Iterables.transform(Lists.newArrayList(new Integer[]{1}), num -> {
            return new CloseableIterable<Integer>() { // from class: org.apache.iceberg.util.TestParallelIterable.3
                public void close() {
                }

                /* renamed from: iterator, reason: merged with bridge method [inline-methods] */
                public CloseableIterator<Integer> m88iterator() {
                    return CloseableIterator.withClose(it);
                }
            };
        }), newFixedThreadPool).iterator();
        Assertions.assertThat(it2.hasNext()).isTrue();
        Assertions.assertThat((Integer) it2.next()).isNotNull();
        Awaitility.await("Queue is populated").atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
            queueHasElements(it2);
        });
        it2.close();
        Awaitility.await("Queue is cleared").atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(it2.queueSize()).as("Queue is not empty after cleaning", new Object[0]).isEqualTo(0);
        });
    }

    @Test
    public void limitQueueSize() {
        ImmutableList of = ImmutableList.of(() -> {
            return IntStream.range(0, 100).iterator();
        }, () -> {
            return IntStream.range(0, 100).iterator();
        }, () -> {
            return IntStream.range(0, 100).iterator();
        });
        Multiset multiset = (Multiset) IntStream.range(0, 100).boxed().flatMap(num -> {
            return Stream.of((Object[]) new Integer[]{num, num, num});
        }).collect(ImmutableMultiset.toImmutableMultiset());
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        ParallelIterable.ParallelIterator it = new ParallelIterable(of, newCachedThreadPool, 20).iterator();
        HashMultiset create = HashMultiset.create();
        while (it.hasNext()) {
            Assertions.assertThat(it.queueSize()).as("iterator internal queue size", new Object[0]).isLessThanOrEqualTo(20 + of.size());
            create.add((Integer) it.next());
        }
        Assertions.assertThat(create).as("multiset of values returned by the iterator", new Object[0]).isEqualTo(multiset);
        it.close();
        newCachedThreadPool.shutdownNow();
    }

    @Test
    public void queueSizeOne() {
        ImmutableList of = ImmutableList.of(() -> {
            return IntStream.range(0, 100).iterator();
        }, () -> {
            return IntStream.range(0, 100).iterator();
        }, () -> {
            return IntStream.range(0, 100).iterator();
        });
        Multiset multiset = (Multiset) IntStream.range(0, 100).boxed().flatMap(num -> {
            return Stream.of((Object[]) new Integer[]{num, num, num});
        }).collect(ImmutableMultiset.toImmutableMultiset());
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        ParallelIterable.ParallelIterator it = new ParallelIterable(of, newCachedThreadPool, 1).iterator();
        HashMultiset create = HashMultiset.create();
        while (it.hasNext()) {
            Assertions.assertThat(it.queueSize()).as("iterator internal queue size", new Object[0]).isLessThanOrEqualTo(1 + of.size());
            create.add((Integer) it.next());
        }
        Assertions.assertThat(create).as("multiset of values returned by the iterator", new Object[0]).isEqualTo(multiset);
        it.close();
        newCachedThreadPool.shutdownNow();
    }

    private void queueHasElements(ParallelIterable.ParallelIterator<Integer> parallelIterator) {
        Assertions.assertThat(parallelIterator.hasNext()).isTrue();
        Assertions.assertThat((Integer) parallelIterator.next()).isNotNull();
        Assertions.assertThat(parallelIterator.queueSize()).as("queue size", new Object[0]).isGreaterThan(0);
    }
}
