package org.apache.paimon.utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.paimon.data.serializer.IntSerializer;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.utils.ParallelExecution;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/utils/ParallelExecutionTest.class */
public class ParallelExecutionTest {
    @Test
    public void testNormal() {
        Assertions.assertThat(collect(new ParallelExecution<>(new IntSerializer(), 1024, 2, Arrays.asList(() -> {
            return Pair.of(create(new LinkedList(Arrays.asList(Arrays.asList(1, 5, 6), Arrays.asList(2, 7)))), 1);
        }, () -> {
            return Pair.of(create(new LinkedList(Arrays.asList(Arrays.asList(33, 55), Arrays.asList(22, 77)))), 2);
        }, () -> {
            return Pair.of(create(new LinkedList(Arrays.asList(Arrays.asList(333, 555), Arrays.asList(222, 777)))), 3);
        })))).containsExactlyInAnyOrder(new Pair[]{Pair.of(1, 1), Pair.of(5, 1), Pair.of(6, 1), Pair.of(2, 1), Pair.of(7, 1), Pair.of(33, 2), Pair.of(55, 2), Pair.of(22, 2), Pair.of(77, 2), Pair.of(333, 3), Pair.of(555, 3), Pair.of(222, 3), Pair.of(777, 3)});
    }

    @Test
    public void testException() {
        final String str = "Test Exception";
        Supplier supplier = () -> {
            return Pair.of(create(new LinkedList(Arrays.asList(Arrays.asList(1, 5, 6), Arrays.asList(2, 7)))), 1);
        };
        RecordReader<Integer> recordReader = new RecordReader<Integer>() { // from class: org.apache.paimon.utils.ParallelExecutionTest.1
            @Nullable
            public RecordReader.RecordIterator<Integer> readBatch() {
                throw new RuntimeException(str);
            }

            public void close() {
            }
        };
        ParallelExecution parallelExecution = new ParallelExecution(new IntSerializer(), 1024, 2, Arrays.asList(supplier, supplier, () -> {
            return Pair.of(recordReader, 2);
        }));
        Assertions.assertThatThrownBy(() -> {
            collect(parallelExecution);
        }).hasMessageContaining("Test Exception");
    }

    private RecordReader<Integer> create(final Queue<List<Integer>> queue) {
        return new RecordReader<Integer>() { // from class: org.apache.paimon.utils.ParallelExecutionTest.2
            @Nullable
            public RecordReader.RecordIterator<Integer> readBatch() {
                List list = (List) queue.poll();
                if (list == null) {
                    return null;
                }
                final LinkedList linkedList = new LinkedList(list);
                return new RecordReader.RecordIterator<Integer>() { // from class: org.apache.paimon.utils.ParallelExecutionTest.2.1
                    @Nullable
                    /* renamed from: next, reason: merged with bridge method [inline-methods] */
                    public Integer m54next() {
                        return (Integer) linkedList.poll();
                    }

                    public void releaseBatch() {
                    }
                };
            }

            public void close() {
            }
        };
    }

    private List<Pair<Integer, Integer>> collect(ParallelExecution<Integer, Integer> parallelExecution) {
        ArrayList arrayList = new ArrayList();
        while (true) {
            try {
                ParallelExecution.ParallelBatch take = parallelExecution.take();
                if (take == null) {
                    return arrayList;
                }
                while (true) {
                    Integer num = (Integer) take.next();
                    if (num == null) {
                        break;
                    }
                    arrayList.add(Pair.of(num, take.extraMessage()));
                }
                take.releaseBatch();
            } catch (IOException | InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }
}
