package org.apache.flink.connectors.test.common.testsuites;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connectors.test.common.environment.ClusterControllable;
import org.apache.flink.connectors.test.common.environment.TestEnvironment;
import org.apache.flink.connectors.test.common.external.ExternalContext;
import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
import org.apache.flink.connectors.test.common.junit.extensions.ConnectorTestingExtension;
import org.apache.flink.connectors.test.common.junit.extensions.TestCaseInvocationContextProvider;
import org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
import org.apache.flink.connectors.test.common.utils.TestDataMatchers;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.util.CloseableIterator;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
@ExtendWith({ConnectorTestingExtension.class, TestLoggerExtension.class, TestCaseInvocationContextProvider.class})
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
/* loaded from: input_file:org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.class */
public abstract class SourceTestSuiteBase<T> {
    private static final Logger LOG = LoggerFactory.getLogger(SourceTestSuiteBase.class);

    @DisplayName("Test source with single split")
    @TestTemplate
    public void testSourceSingleSplit(TestEnvironment testEnvironment, ExternalContext<T> externalContext) throws Exception {
        LOG.info("Writing test data to split 0");
        List<T> generateAndWriteTestData = generateAndWriteTestData(0, externalContext);
        StreamExecutionEnvironment createExecutionEnvironment = testEnvironment.createExecutionEnvironment();
        LOG.info("Submitting Flink job to test environment");
        CloseableIterator executeAndCollect = createExecutionEnvironment.fromSource(externalContext.createSource(Boundedness.BOUNDED), WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(1).executeAndCollect("Source Single Split Test");
        Throwable th = null;
        try {
            try {
                LOG.info("Checking test results");
                MatcherAssert.assertThat(executeAndCollect, TestDataMatchers.matchesSplitTestData(generateAndWriteTestData));
                if (executeAndCollect != null) {
                    if (0 == 0) {
                        executeAndCollect.close();
                        return;
                    }
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th4;
        }
    }

    @DisplayName("Test source with multiple splits")
    @TestTemplate
    public void testMultipleSplits(TestEnvironment testEnvironment, ExternalContext<T> externalContext) throws Exception {
        ArrayList arrayList = new ArrayList();
        LOG.info("Writing test data to split 0 to 3...");
        for (int i = 0; i < 4; i++) {
            arrayList.add(generateAndWriteTestData(i, externalContext));
        }
        LOG.info("Submitting Flink job to test environment");
        CloseableIterator executeAndCollect = testEnvironment.createExecutionEnvironment().fromSource(externalContext.createSource(Boundedness.BOUNDED), WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(4).executeAndCollect("Source Multiple Split Test");
        Throwable th = null;
        try {
            try {
                LOG.info("Checking test results");
                MatcherAssert.assertThat(executeAndCollect, TestDataMatchers.matchesMultipleSplitTestData(arrayList));
                if (executeAndCollect != null) {
                    if (0 == 0) {
                        executeAndCollect.close();
                        return;
                    }
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th4;
        }
    }

    @DisplayName("Test source with at least one idle parallelism")
    @TestTemplate
    public void testIdleReader(TestEnvironment testEnvironment, ExternalContext<T> externalContext) throws Exception {
        ArrayList arrayList = new ArrayList();
        LOG.info("Writing test data to split 0 to 3");
        for (int i = 0; i < 4; i++) {
            arrayList.add(generateAndWriteTestData(i, externalContext));
        }
        LOG.info("Submitting Flink job to test environment");
        CloseableIterator executeAndCollect = testEnvironment.createExecutionEnvironment().fromSource(externalContext.createSource(Boundedness.BOUNDED), WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(5).executeAndCollect("Idle Reader Test");
        Throwable th = null;
        try {
            try {
                LOG.info("Checking test results");
                MatcherAssert.assertThat(executeAndCollect, TestDataMatchers.matchesMultipleSplitTestData(arrayList));
                if (executeAndCollect != null) {
                    if (0 == 0) {
                        executeAndCollect.close();
                        return;
                    }
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th4;
        }
    }

    @DisplayName("Test TaskManager failure")
    @TestTemplate
    public void testTaskManagerFailure(TestEnvironment testEnvironment, ExternalContext<T> externalContext, ClusterControllable clusterControllable) throws Exception {
        LOG.info("Writing test data to split {}", 0);
        List<T> generateTestData = externalContext.generateTestData(0, ThreadLocalRandom.current().nextLong());
        SourceSplitDataWriter<T> createSourceSplitDataWriter = externalContext.createSourceSplitDataWriter();
        createSourceSplitDataWriter.writeRecords(generateTestData);
        StreamExecutionEnvironment createExecutionEnvironment = testEnvironment.createExecutionEnvironment();
        createExecutionEnvironment.enableCheckpointing(50L);
        DataStreamSource parallelism = createExecutionEnvironment.fromSource(externalContext.createSource(Boundedness.CONTINUOUS_UNBOUNDED), WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(1);
        TypeSerializer createSerializer = parallelism.getType().createSerializer(createExecutionEnvironment.getConfig());
        String str = "dataStreamCollect_" + UUID.randomUUID();
        CollectSinkOperatorFactory collectSinkOperatorFactory = new CollectSinkOperatorFactory(createSerializer, str);
        CollectResultIterator collectResultIterator = new CollectResultIterator(collectSinkOperatorFactory.getOperator().getOperatorIdFuture(), createSerializer, str, createExecutionEnvironment.getCheckpointConfig());
        CollectStreamSink collectStreamSink = new CollectStreamSink(parallelism, collectSinkOperatorFactory);
        collectStreamSink.name("Data stream collect sink");
        createExecutionEnvironment.addOperator(collectStreamSink.getTransformation());
        LOG.info("Submitting Flink job to test environment");
        JobClient executeAsync = createExecutionEnvironment.executeAsync("TaskManager Failover Test");
        collectResultIterator.setJobClient(executeAsync);
        LOG.info("Checking records before killing TaskManagers");
        MatcherAssert.assertThat(collectResultIterator, TestDataMatchers.matchesSplitTestData(generateTestData, generateTestData.size()));
        LOG.info("Trigger TaskManager failover");
        clusterControllable.triggerTaskManagerFailover(executeAsync, () -> {
        });
        LOG.info("Waiting for job recovering from failure");
        CommonTestUtils.waitForJobStatus(executeAsync, Collections.singletonList(JobStatus.RUNNING), Deadline.fromNow(Duration.ofSeconds(30L)));
        LOG.info("Writing test data to split {}", 0);
        List<T> generateTestData2 = externalContext.generateTestData(0, ThreadLocalRandom.current().nextLong());
        createSourceSplitDataWriter.writeRecords(generateTestData2);
        LOG.info("Checking records after job failover");
        MatcherAssert.assertThat(collectResultIterator, TestDataMatchers.matchesSplitTestData(generateTestData2, generateTestData2.size()));
        collectResultIterator.close();
        CommonTestUtils.terminateJob(executeAsync, Duration.ofSeconds(30L));
        CommonTestUtils.waitForJobStatus(executeAsync, Collections.singletonList(JobStatus.CANCELED), Deadline.fromNow(Duration.ofSeconds(30L)));
    }

    protected List<T> generateAndWriteTestData(int i, ExternalContext<T> externalContext) {
        List<T> generateTestData = externalContext.generateTestData(i, ThreadLocalRandom.current().nextLong());
        LOG.debug("Writing {} records to external system", Integer.valueOf(generateTestData.size()));
        externalContext.createSourceSplitDataWriter().writeRecords(generateTestData);
        return generateTestData;
    }
}
