package org.apache.flink.table.runtime.arrow.sources;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.arrow.ArrowUtils;
import org.apache.flink.table.runtime.arrow.ArrowWriter;
import org.apache.flink.testutils.CustomEqualityMatcher;
import org.apache.flink.testutils.DeeplyEqualsChecker;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.HamcrestCondition;
import org.junit.jupiter.api.Test;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/table/runtime/arrow/sources/ArrowSourceFunctionTestBase.class */
public abstract class ArrowSourceFunctionTestBase {
    final VectorSchemaRoot root;
    private final TypeSerializer<RowData> typeSerializer;
    private final Comparator<RowData> comparator;
    private final DeeplyEqualsChecker checker;

    /* loaded from: input_file:org/apache/flink/table/runtime/arrow/sources/ArrowSourceFunctionTestBase$DummySourceContext.class */
    private static abstract class DummySourceContext<T> implements SourceFunction.SourceContext<T> {
        private final Object lock;

        private DummySourceContext() {
            this.lock = new Object();
        }

        public void collectWithTimestamp(T t, long j) {
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void markAsTemporarilyIdle() {
        }

        public Object getCheckpointLock() {
            return this.lock;
        }

        public void close() {
        }
    }

    ArrowSourceFunctionTestBase(VectorSchemaRoot vectorSchemaRoot, TypeSerializer<RowData> typeSerializer, Comparator<RowData> comparator) {
        this(vectorSchemaRoot, typeSerializer, comparator, new DeeplyEqualsChecker());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ArrowSourceFunctionTestBase(VectorSchemaRoot vectorSchemaRoot, TypeSerializer<RowData> typeSerializer, Comparator<RowData> comparator, DeeplyEqualsChecker deeplyEqualsChecker) {
        this.root = (VectorSchemaRoot) Preconditions.checkNotNull(vectorSchemaRoot);
        this.typeSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.comparator = (Comparator) Preconditions.checkNotNull(comparator);
        this.checker = (DeeplyEqualsChecker) Preconditions.checkNotNull(deeplyEqualsChecker);
    }

    @Test
    void testRestore() throws Exception {
        OperatorSubtaskState snapshot;
        Tuple2<List<RowData>, Integer> testData = getTestData();
        ArrowSourceFunction createTestArrowSourceFunction = createTestArrowSourceFunction((List) testData.f0, ((Integer) testData.f1).intValue());
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(new StreamSource(createTestArrowSourceFunction), 1, 1, 0);
        abstractStreamOperatorTestHarness.open();
        Throwable[] thArr = new Throwable[1];
        final MultiShotLatch multiShotLatch = new MultiShotLatch();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final ArrayList arrayList = new ArrayList();
        DummySourceContext<RowData> dummySourceContext = new DummySourceContext<RowData>() { // from class: org.apache.flink.table.runtime.arrow.sources.ArrowSourceFunctionTestBase.1
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            public void collect(RowData rowData) {
                if (atomicInteger.get() == 2) {
                    multiShotLatch.trigger();
                    throw new RuntimeException("Fail the arrow source");
                }
                arrayList.add(ArrowSourceFunctionTestBase.this.typeSerializer.copy(rowData));
                atomicInteger.incrementAndGet();
            }
        };
        Thread thread = new Thread(() -> {
            try {
                createTestArrowSourceFunction.run(dummySourceContext);
            } catch (Throwable th) {
                if (th.getMessage().equals("Fail the arrow source")) {
                    return;
                }
                thArr[0] = th;
            }
        });
        thread.start();
        if (!multiShotLatch.isTriggered()) {
            multiShotLatch.await();
        }
        synchronized (dummySourceContext.getCheckpointLock()) {
            snapshot = abstractStreamOperatorTestHarness.snapshot(0L, 0L);
        }
        thread.join();
        abstractStreamOperatorTestHarness.close();
        ArrowSourceFunction createTestArrowSourceFunction2 = createTestArrowSourceFunction((List) testData.f0, ((Integer) testData.f1).intValue());
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness2 = new AbstractStreamOperatorTestHarness(new StreamSource(createTestArrowSourceFunction2), 1, 1, 0);
        abstractStreamOperatorTestHarness2.initializeState(snapshot);
        abstractStreamOperatorTestHarness2.open();
        Thread thread2 = new Thread(() -> {
            try {
                createTestArrowSourceFunction2.run(new DummySourceContext<RowData>() { // from class: org.apache.flink.table.runtime.arrow.sources.ArrowSourceFunctionTestBase.2
                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super();
                    }

                    public void collect(RowData rowData) {
                        arrayList.add(ArrowSourceFunctionTestBase.this.typeSerializer.copy(rowData));
                        if (atomicInteger.incrementAndGet() == ((List) testData.f0).size()) {
                            multiShotLatch.trigger();
                        }
                    }
                });
            } catch (Throwable th) {
                thArr[0] = th;
            }
        });
        thread2.start();
        if (!multiShotLatch.isTriggered()) {
            multiShotLatch.await();
        }
        thread2.join();
        Assertions.assertThat(thArr[0]).isNull();
        Assertions.assertThat((List) testData.f0).hasSize(atomicInteger.get());
        checkElementsEquals(arrayList, (List) testData.f0);
    }

    @Test
    void testParallelProcessing() throws Exception {
        Tuple2<List<RowData>, Integer> testData = getTestData();
        ArrowSourceFunction createTestArrowSourceFunction = createTestArrowSourceFunction((List) testData.f0, ((Integer) testData.f1).intValue());
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(new StreamSource(createTestArrowSourceFunction), 2, 2, 0);
        abstractStreamOperatorTestHarness.open();
        Throwable[] thArr = new Throwable[2];
        OneShotLatch oneShotLatch = new OneShotLatch();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        List<RowData> synchronizedList = Collections.synchronizedList(new ArrayList());
        Thread thread = new Thread(() -> {
            try {
                createTestArrowSourceFunction.run(new DummySourceContext<RowData>() { // from class: org.apache.flink.table.runtime.arrow.sources.ArrowSourceFunctionTestBase.3
                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super();
                    }

                    public void collect(RowData rowData) {
                        synchronizedList.add(ArrowSourceFunctionTestBase.this.typeSerializer.copy(rowData));
                        if (atomicInteger.incrementAndGet() == ((List) testData.f0).size()) {
                            oneShotLatch.trigger();
                        }
                    }
                });
            } catch (Throwable th) {
                thArr[0] = th;
            }
        });
        thread.start();
        ArrowSourceFunction createTestArrowSourceFunction2 = createTestArrowSourceFunction((List) testData.f0, ((Integer) testData.f1).intValue());
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness2 = new AbstractStreamOperatorTestHarness(new StreamSource(createTestArrowSourceFunction2), 2, 2, 1);
        abstractStreamOperatorTestHarness2.open();
        Thread thread2 = new Thread(() -> {
            try {
                createTestArrowSourceFunction2.run(new DummySourceContext<RowData>() { // from class: org.apache.flink.table.runtime.arrow.sources.ArrowSourceFunctionTestBase.4
                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super();
                    }

                    public void collect(RowData rowData) {
                        synchronizedList.add(ArrowSourceFunctionTestBase.this.typeSerializer.copy(rowData));
                        if (atomicInteger.incrementAndGet() == ((List) testData.f0).size()) {
                            oneShotLatch.trigger();
                        }
                    }
                });
            } catch (Throwable th) {
                thArr[1] = th;
            }
        });
        thread2.start();
        if (!oneShotLatch.isTriggered()) {
            oneShotLatch.await();
        }
        thread.join();
        thread2.join();
        abstractStreamOperatorTestHarness.close();
        abstractStreamOperatorTestHarness2.close();
        Assertions.assertThat(thArr[0]).isNull();
        Assertions.assertThat(thArr[1]).isNull();
        Assertions.assertThat((List) testData.f0).hasSize(atomicInteger.get());
        checkElementsEquals(synchronizedList, (List) testData.f0);
    }

    public abstract Tuple2<List<RowData>, Integer> getTestData();

    public abstract ArrowWriter<RowData> createArrowWriter();

    public abstract ArrowSourceFunction createArrowSourceFunction(byte[][] bArr);

    private void checkElementsEquals(List<RowData> list, List<RowData> list2) {
        Assertions.assertThat(list).hasSize(list2.size());
        list.sort(this.comparator);
        list2.sort(this.comparator);
        for (int i = 0; i < list2.size(); i++) {
            Assertions.assertThat(list.get(i)).is(HamcrestCondition.matching(CustomEqualityMatcher.deeplyEquals(list2.get(i)).withChecker(this.checker)));
        }
    }

    private ArrowSourceFunction createTestArrowSourceFunction(List<RowData> list, int i) throws IOException {
        ArrowWriter<RowData> createArrowWriter = createArrowWriter();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(this.root, (DictionaryProvider) null, byteArrayOutputStream);
        arrowStreamWriter.start();
        Iterator it = Lists.partition(list, (list.size() / i) + 1).iterator();
        while (it.hasNext()) {
            Iterator it2 = ((List) it.next()).iterator();
            while (it2.hasNext()) {
                createArrowWriter.write((RowData) it2.next());
            }
            createArrowWriter.finish();
            arrowStreamWriter.writeBatch();
            createArrowWriter.reset();
        }
        ArrowSourceFunction createArrowSourceFunction = createArrowSourceFunction(ArrowUtils.readArrowBatches(Channels.newChannel(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))));
        createArrowSourceFunction.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 0));
        return createArrowSourceFunction;
    }
}
