package org.apache.paimon.flink.source.operator;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkTestBase;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/source/operator/OperatorSourceTest.class */
public class OperatorSourceTest {

    @TempDir
    Path tempDir;
    private Table table;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/flink/source/operator/OperatorSourceTest$DummySourceContext.class */
    public static abstract class DummySourceContext implements SourceFunction.SourceContext<Split> {
        private final Object lock;

        private DummySourceContext() {
            this.lock = new Object();
        }

        public void collectWithTimestamp(Split split, long j) {
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void markAsTemporarilyIdle() {
        }

        public Object getCheckpointLock() {
            return this.lock;
        }

        public void close() {
        }
    }

    @BeforeEach
    public void before() throws Catalog.TableAlreadyExistException, Catalog.DatabaseNotExistException, Catalog.TableNotExistException, Catalog.DatabaseAlreadyExistException {
        Catalog createCatalog = CatalogFactory.createCatalog(CatalogContext.create(new org.apache.paimon.fs.Path(this.tempDir.toUri())));
        Schema build = Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.INT()).column("c", DataTypes.INT()).primaryKey(new String[]{"a"}).options(Collections.singletonMap(CoreOptions.CONSUMER_ID.key(), "my_consumer")).build();
        Identifier create = Identifier.create(FlinkTestBase.CURRENT_DATABASE, "t");
        createCatalog.createDatabase(FlinkTestBase.CURRENT_DATABASE, false);
        createCatalog.createTable(create, build, false);
        this.table = createCatalog.getTable(create);
    }

    private void writeToTable(int i, int i2, int i3) throws Exception {
        BatchWriteBuilder newBatchWriteBuilder = this.table.newBatchWriteBuilder();
        BatchTableWrite newWrite = newBatchWriteBuilder.newWrite();
        newWrite.write(GenericRow.of(new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3)}));
        newBatchWriteBuilder.newCommit().commit(newWrite.prepareCommit());
        newWrite.close();
    }

    private List<List<Integer>> readSplit(Split split) throws IOException {
        TableRead newRead = this.table.newReadBuilder().newRead();
        ArrayList arrayList = new ArrayList();
        newRead.createReader(split).forEachRemaining(internalRow -> {
            arrayList.add(Arrays.asList(Integer.valueOf(internalRow.getInt(0)), Integer.valueOf(internalRow.getInt(1)), Integer.valueOf(internalRow.getInt(2))));
        });
        return arrayList;
    }

    @Test
    public void testMonitorFunction() throws Exception {
        MonitorFunction monitorFunction = new MonitorFunction(this.table.newReadBuilder(), 10L);
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(new StreamSource(monitorFunction), 1, 1, 0);
        abstractStreamOperatorTestHarness.open();
        OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) testReadSplit(monitorFunction, () -> {
            return abstractStreamOperatorTestHarness.snapshot(0L, 0L);
        }, 1, 1, 1);
        MonitorFunction monitorFunction2 = new MonitorFunction(this.table.newReadBuilder(), 10L);
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness2 = new AbstractStreamOperatorTestHarness(new StreamSource(monitorFunction2), 1, 1, 0);
        abstractStreamOperatorTestHarness2.initializeState(operatorSubtaskState);
        abstractStreamOperatorTestHarness2.open();
        testReadSplit(monitorFunction2, () -> {
            abstractStreamOperatorTestHarness2.snapshot(1L, 1L);
            abstractStreamOperatorTestHarness2.notifyOfCompletedCheckpoint(1L);
            return null;
        }, 2, 2, 2);
        MonitorFunction monitorFunction3 = new MonitorFunction(this.table.newReadBuilder(), 10L);
        new AbstractStreamOperatorTestHarness(new StreamSource(monitorFunction3), 1, 1, 0).open();
        testReadSplit(monitorFunction3, () -> {
            return null;
        }, 3, 3, 3);
    }

    @Test
    public void testReadOperator() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new ReadOperator(this.table.newReadBuilder()));
        oneInputStreamOperatorTestHarness.setup(InternalSerializers.create(RowType.of(new LogicalType[]{new IntType(), new IntType(), new IntType()})));
        writeToTable(1, 1, 1);
        writeToTable(2, 2, 2);
        List splits = this.table.newReadBuilder().newScan().plan().splits();
        oneInputStreamOperatorTestHarness.open();
        Iterator it = splits.iterator();
        while (it.hasNext()) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord((Split) it.next()));
        }
        Assertions.assertThat(new ArrayList(oneInputStreamOperatorTestHarness.getOutput())).containsExactlyInAnyOrder(new Object[]{new StreamRecord(GenericRowData.of(new Object[]{1, 1, 1})), new StreamRecord(GenericRowData.of(new Object[]{2, 2, 2}))});
    }

    private <T> T testReadSplit(MonitorFunction monitorFunction, SupplierWithException<T, Exception> supplierWithException, int i, int i2, int i3) throws Exception {
        Throwable[] thArr = new Throwable[1];
        final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10);
        DummySourceContext dummySourceContext = new DummySourceContext() { // from class: org.apache.paimon.flink.source.operator.OperatorSourceTest.1
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            public void collect(Split split) {
                arrayBlockingQueue.add(split);
            }
        };
        Thread thread = new Thread(() -> {
            try {
                monitorFunction.run(dummySourceContext);
            } catch (Throwable th) {
                th.printStackTrace();
                thArr[0] = th;
            }
        });
        thread.start();
        writeToTable(i, i2, i3);
        Assertions.assertThat(readSplit((Split) arrayBlockingQueue.poll(1L, TimeUnit.MINUTES))).containsExactlyInAnyOrder(new List[]{Arrays.asList(Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3))});
        T t = (T) supplierWithException.get();
        monitorFunction.cancel();
        thread.join();
        Assertions.assertThat(thArr[0]).isNull();
        return t;
    }
}
