package org.apache.flink.cdc.connectors.mongodb.utils;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils.class */
public class MongoDBTestUtils {

    /* loaded from: input_file:org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils$FailoverPhase.class */
    public enum FailoverPhase {
        SNAPSHOT,
        STREAM,
        NEVER
    }

    /* loaded from: input_file:org/apache/flink/cdc/connectors/mongodb/utils/MongoDBTestUtils$FailoverType.class */
    public enum FailoverType {
        TM,
        JM,
        NONE
    }

    public static void waitForSnapshotStarted(String str) throws InterruptedException {
        while (sinkSize(str) == 0) {
            Thread.sleep(100L);
        }
    }

    public static void waitForSinkSize(String str, int i) throws InterruptedException {
        waitForSinkSize(str, i, 10L, TimeUnit.MINUTES);
    }

    public static void waitForSinkSize(String str, int i, long j, TimeUnit timeUnit) throws InterruptedException {
        long nanoTime = System.nanoTime() + timeUnit.toNanos(j);
        while (sinkSize(str) < i) {
            if (System.nanoTime() > nanoTime) {
                Assert.fail("Wait for sink size timeout, raw results: \n" + String.join("\n", TestValuesTableFactory.getRawResults(str)));
            }
            Thread.sleep(100L);
        }
    }

    public static int sinkSize(String str) {
        int size;
        synchronized (TestValuesTableFactory.class) {
            try {
                size = TestValuesTableFactory.getRawResults(str).size();
            } catch (IllegalArgumentException e) {
                return 0;
            }
        }
        return size;
    }

    public static List<String> fetchRowData(Iterator<RowData> it, int i, Function<RowData, String> function) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next());
            i--;
        }
        return (List) arrayList.stream().map(function).collect(Collectors.toList());
    }

    public static List<String> fetchRows(Iterator<Row> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next().toString());
            i--;
        }
        return arrayList;
    }

    public static void triggerFailover(FailoverType failoverType, JobID jobID, MiniCluster miniCluster, Runnable runnable) throws Exception {
        switch (failoverType) {
            case TM:
                restartTaskManager(miniCluster, runnable);
                return;
            case JM:
                triggerJobManagerFailover(jobID, miniCluster, runnable);
                return;
            case NONE:
                return;
            default:
                throw new IllegalStateException("Unexpected value: " + failoverType);
        }
    }

    public static void triggerJobManagerFailover(JobID jobID, MiniCluster miniCluster, Runnable runnable) throws Exception {
        HaLeadershipControl haLeadershipControl = (HaLeadershipControl) miniCluster.getHaLeadershipControl().get();
        haLeadershipControl.revokeJobMasterLeadership(jobID).get();
        runnable.run();
        haLeadershipControl.grantJobMasterLeadership(jobID).get();
    }

    public static void restartTaskManager(MiniCluster miniCluster, Runnable runnable) throws Exception {
        miniCluster.terminateTaskManager(0).get();
        runnable.run();
        miniCluster.startTaskManager();
    }
}
