/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.table.testutils;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.TableTestMatchers;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.hamcrest.Matcher;
import org.junit.Assert;

public class PulsarTableTestUtils {
    public static List<Row> collectRows(Table table, int expectedSize) throws Exception {
        TableResult result = table.execute();
        ArrayList<Row> collectedRows = new ArrayList<Row>();
        try (CloseableIterator iterator = result.collect();){
            while (collectedRows.size() < expectedSize && iterator.hasNext()) {
                collectedRows.add((Row)iterator.next());
            }
        }
        result.getJobClient().ifPresent(jc -> {
            try {
                jc.cancel().get(5L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        return collectedRows;
    }

    public static void comparedWithKeyAndOrder(Map<Row, List<Row>> expectedData, List<Row> actual, int[] keyLoc) {
        HashMap<Row, LinkedList> actualData = new HashMap<Row, LinkedList>();
        for (Row row : actual) {
            Row key = Row.project((Row)row, (int[])keyLoc);
            key.setKind(RowKind.INSERT);
            actualData.computeIfAbsent(key, k -> new LinkedList()).add(row);
        }
        Assert.assertEquals((String)("Actual result: " + actual), (long)expectedData.size(), (long)actualData.size());
        for (Row key : expectedData.keySet()) {
            Assert.assertThat(actualData.get(key), (Matcher)TableTestMatchers.deepEqualTo(expectedData.get(key), (boolean)false));
        }
    }

    public static void waitingExpectedResults(String sinkName, List<String> expected, Duration timeout) throws InterruptedException, TimeoutException {
        Collections.sort(expected);
        CommonTestUtils.waitUtil(() -> {
            List actual = TestValuesTableFactory.getResults((String)sinkName);
            Collections.sort(actual);
            return expected.equals(actual);
        }, (Duration)timeout, (String)"Can not get the expected result.");
    }
}

