package org.apache.paimon.flink;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.utils.FailingFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.class */
public class ChangelogWithKeyFileStoreTableITCase extends AbstractTestBase {
    private String path;
    private Map<String, String> tableDefaultProperties;
    private static final int NUM_PARTS = 4;
    private static final int NUM_KEYS = 64;
    private static final int NUM_VALUES = 1024;
    private static final int LIMIT = 10000;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.paimon.flink.ChangelogWithKeyFileStoreTableITCase$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$types$RowKind = new int[RowKind.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_AFTER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_BEFORE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.DELETE.ordinal()] = ChangelogWithKeyFileStoreTableITCase.NUM_PARTS;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase$ResultChecker.class */
    public static class ResultChecker {
        private final Map<String, String> valueMap;
        private final Map<String, RowKind> kindMap;

        private ResultChecker() {
            this.valueMap = new HashMap();
            this.kindMap = new HashMap();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addChangelog(Row row) {
            String str = row.getField(0) + "|" + row.getField(1);
            String str2 = row.getField(2) + "|" + row.getField(3);
            switch (AnonymousClass1.$SwitchMap$org$apache$flink$types$RowKind[row.getKind().ordinal()]) {
                case 1:
                    Assertions.assertThat(this.valueMap.containsKey(str)).isFalse();
                    Assertions.assertThat(!this.kindMap.containsKey(str) || this.kindMap.get(str) == RowKind.DELETE).isTrue();
                    this.valueMap.put(str, str2);
                    break;
                case ReadWriteTableTestUtil.DEFAULT_PARALLELISM /* 2 */:
                    Assertions.assertThat(this.valueMap.containsKey(str)).isFalse();
                    Assertions.assertThat(this.kindMap.get(str)).isEqualTo(RowKind.UPDATE_BEFORE);
                    this.valueMap.put(str, str2);
                    break;
                case 3:
                case ChangelogWithKeyFileStoreTableITCase.NUM_PARTS /* 4 */:
                    Assertions.assertThat(this.valueMap.get(str)).isEqualTo(str2);
                    Assertions.assertThat(this.kindMap.get(str) == RowKind.INSERT || this.kindMap.get(str) == RowKind.UPDATE_AFTER).isTrue();
                    this.valueMap.remove(str);
                    break;
                default:
                    throw new UnsupportedOperationException("Unknown row kind " + row.getKind());
            }
            this.kindMap.put(str, row.getKind());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void assertResult(int i) {
            Assertions.assertThat(this.valueMap.size()).isEqualTo(256 * i);
            for (int i2 = 0; i2 < ChangelogWithKeyFileStoreTableITCase.NUM_PARTS; i2++) {
                for (int i3 = 0; i3 < ChangelogWithKeyFileStoreTableITCase.NUM_KEYS * i; i3++) {
                    int i4 = 10000 + (i2 * ChangelogWithKeyFileStoreTableITCase.NUM_KEYS) + (i3 % ChangelogWithKeyFileStoreTableITCase.NUM_KEYS);
                    Assertions.assertThat(this.valueMap.get(i2 + "|" + i3)).isEqualTo(i4 + "|" + i4 + ".str");
                }
            }
        }

        /* synthetic */ ResultChecker(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    @BeforeEach
    public void before() throws IOException {
        this.path = getTempDirPath();
        ThreadLocalRandom current = ThreadLocalRandom.current();
        this.tableDefaultProperties = new HashMap();
        if (current.nextBoolean()) {
            this.tableDefaultProperties.put(CoreOptions.LOCAL_MERGE_BUFFER_SIZE.key(), "256 kb");
        }
    }

    private TableEnvironment createBatchTableEnvironment() {
        return TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
    }

    private TableEnvironment createStreamingTableEnvironment(int i) {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
        create.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(i));
        return create;
    }

    private StreamExecutionEnvironment createStreamExecutionEnvironment(int i) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setCheckpointInterval(i);
        return executionEnvironment;
    }

    private String createCatalogSql(String str, String str2) {
        return String.format("CREATE CATALOG `%s` WITH ( 'type' = 'paimon', 'warehouse' = '%s' %s )", str, str2, this.tableDefaultProperties.size() > 0 ? ", " + ((String) this.tableDefaultProperties.entrySet().stream().map(entry -> {
            return String.format("'table-default.%s' = '%s'", entry.getKey(), entry.getValue());
        }).collect(Collectors.joining(", "))) : "");
    }

    @Timeout(1200)
    @Test
    public void testFullCompactionTriggerInterval() throws Exception {
        innerTestChangelogProducing(Arrays.asList("'changelog-producer' = 'full-compaction'", "'full-compaction.delta-commits' = '3'"));
    }

    @Timeout(1200)
    @Test
    public void testFullCompactionWithLongCheckpointInterval() throws Exception {
        TableEnvironment createBatchTableEnvironment = createBatchTableEnvironment();
        createBatchTableEnvironment.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        createBatchTableEnvironment.executeSql(createCatalogSql("testCatalog", this.path));
        createBatchTableEnvironment.executeSql("USE CATALOG testCatalog");
        createBatchTableEnvironment.executeSql("CREATE TABLE T (  k INT,  v INT,  PRIMARY KEY (k) NOT ENFORCED) WITH (  'bucket' = '1',  'changelog-producer' = 'full-compaction',  'write-only' = 'true')");
        TableEnvironment createStreamingTableEnvironment = createStreamingTableEnvironment(100);
        createStreamingTableEnvironment.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        createStreamingTableEnvironment.executeSql(createCatalogSql("testCatalog", this.path));
        createStreamingTableEnvironment.executeSql("USE CATALOG testCatalog");
        CloseableIterator collect = createStreamingTableEnvironment.executeSql("SELECT * FROM T").collect();
        StreamExecutionEnvironment createStreamExecutionEnvironment = createStreamExecutionEnvironment(2000);
        createStreamExecutionEnvironment.setParallelism(1);
        createStreamExecutionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        new CompactAction(this.path, FlinkTestBase.CURRENT_DATABASE, "T").build(createStreamExecutionEnvironment);
        JobClient executeAsync = createStreamExecutionEnvironment.executeAsync();
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        while (System.currentTimeMillis() - currentTimeMillis <= 10000) {
            i++;
            createBatchTableEnvironment.executeSql(String.format("INSERT INTO T VALUES (%d, %d)", Integer.valueOf(i), Integer.valueOf(i * 100))).await();
        }
        Assertions.assertThat((Comparable) executeAsync.getJobStatus().get()).isEqualTo(JobStatus.RUNNING);
        for (int i2 = 1; i2 <= i; i2++) {
            Assertions.assertThat(collect.hasNext()).isTrue();
            Assertions.assertThat(((Row) collect.next()).toString()).isEqualTo(String.format("+I[%d, %d]", Integer.valueOf(i2), Integer.valueOf(i2 * 100)));
        }
        collect.close();
    }

    @Timeout(1200)
    @Test
    public void testLookupChangelog() throws Exception {
        innerTestChangelogProducing(Collections.singletonList("'changelog-producer' = 'lookup'"));
    }

    private void innerTestChangelogProducing(List<String> list) throws Exception {
        TableEnvironment createStreamingTableEnvironment = createStreamingTableEnvironment(ThreadLocalRandom.current().nextInt(900) + 100);
        createStreamingTableEnvironment.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        createStreamingTableEnvironment.executeSql(createCatalogSql("testCatalog", this.path + "/warehouse"));
        createStreamingTableEnvironment.executeSql("USE CATALOG testCatalog");
        createStreamingTableEnvironment.executeSql("CREATE TABLE T ( k INT, v STRING, PRIMARY KEY (k) NOT ENFORCED ) WITH ( 'bucket' = '2', " + String.join(",", list) + ")");
        Path path = new Path(this.path, "input");
        LocalFileIO.create().mkdirs(path);
        createStreamingTableEnvironment.executeSql("CREATE TABLE `default_catalog`.`default_database`.`S` ( i INT, g STRING ) WITH ( 'connector' = 'filesystem', 'format' = 'testcsv', 'path' = '" + path + "', 'source.monitor-interval' = '500ms' )");
        createStreamingTableEnvironment.executeSql("INSERT INTO T SELECT SUM(i) AS k, g AS v FROM `default_catalog`.`default_database`.`S` GROUP BY g");
        CloseableIterator collect = createStreamingTableEnvironment.executeSql("SELECT * FROM T").collect();
        createStreamingTableEnvironment.executeSql("INSERT INTO `default_catalog`.`default_database`.`S` VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D')").await();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < NUM_PARTS; i++) {
            arrayList.add(((Row) collect.next()).toString());
        }
        Assertions.assertThat(arrayList).containsExactlyInAnyOrder(new String[]{"+I[1, A]", "+I[2, B]", "+I[3, C]", "+I[4, D]"});
        createStreamingTableEnvironment.executeSql("INSERT INTO `default_catalog`.`default_database`.`S` VALUES (1, 'A'), (1, 'B'), (1, 'C'), (1, 'D')").await();
        arrayList.clear();
        for (int i2 = 0; i2 < 8; i2++) {
            arrayList.add(((Row) collect.next()).toString());
        }
        Assertions.assertThat(arrayList).containsExactlyInAnyOrder(new String[]{"-D[1, A]", "-U[2, B]", "+U[2, A]", "-U[3, C]", "+U[3, B]", "-U[4, D]", "+U[4, C]", "+I[5, D]"});
        collect.close();
    }

    @Timeout(1200)
    @Test
    public void testNoChangelogProducerBatchRandom() throws Exception {
        testNoChangelogProducerRandom(createBatchTableEnvironment(), 1, false);
    }

    @Timeout(1200)
    @Test
    public void testNoChangelogProducerStreamingRandom() throws Exception {
        ThreadLocalRandom current = ThreadLocalRandom.current();
        testNoChangelogProducerRandom(createStreamingTableEnvironment(current.nextInt(900) + 100), current.nextInt(1, 3), current.nextBoolean());
    }

    @Timeout(1200)
    @Test
    public void testFullCompactionChangelogProducerBatchRandom() throws Exception {
        testFullCompactionChangelogProducerRandom(createBatchTableEnvironment(), 1, false);
    }

    @Timeout(1200)
    @Test
    public void testFullCompactionChangelogProducerStreamingRandom() throws Exception {
        ThreadLocalRandom current = ThreadLocalRandom.current();
        testFullCompactionChangelogProducerRandom(createStreamingTableEnvironment(current.nextInt(900) + 100), current.nextInt(1, 3), current.nextBoolean());
    }

    @Timeout(1200)
    @Test
    public void testStandAloneFullCompactJobRandom() throws Exception {
        ThreadLocalRandom current = ThreadLocalRandom.current();
        testStandAloneFullCompactJobRandom(createStreamingTableEnvironment(current.nextInt(900) + 100), current.nextInt(1, 3), current.nextBoolean());
    }

    @Timeout(1200)
    @Test
    public void testLookupChangelogProducerBatchRandom() throws Exception {
        testLookupChangelogProducerRandom(createBatchTableEnvironment(), 1, false);
    }

    @Timeout(1200)
    @Test
    public void testLookupChangelogProducerStreamingRandom() throws Exception {
        ThreadLocalRandom current = ThreadLocalRandom.current();
        testLookupChangelogProducerRandom(createStreamingTableEnvironment(current.nextInt(900) + 100), current.nextInt(1, 3), current.nextBoolean());
    }

    @Timeout(1200)
    @Test
    public void testStandAloneLookupJobRandom() throws Exception {
        ThreadLocalRandom current = ThreadLocalRandom.current();
        testStandAloneLookupJobRandom(createStreamingTableEnvironment(current.nextInt(900) + 100), current.nextInt(1, 3), current.nextBoolean());
    }

    private void testNoChangelogProducerRandom(TableEnvironment tableEnvironment, int i, boolean z) throws Exception {
        Iterator<TableResult> it = testRandom(tableEnvironment, i, z, "'bucket' = '4'").iterator();
        while (it.hasNext()) {
            it.next().await();
        }
        checkBatchResult(i);
    }

    private void testFullCompactionChangelogProducerRandom(TableEnvironment tableEnvironment, int i, boolean z) throws Exception {
        ThreadLocalRandom current = ThreadLocalRandom.current();
        StringBuilder append = new StringBuilder().append("'bucket' = '4',");
        Object[] objArr = new Object[1];
        objArr[0] = current.nextBoolean() ? "512kb" : "1mb";
        testRandom(tableEnvironment, i, z, append.append(String.format("'write-buffer-size' = '%s',", objArr)).append("'changelog-producer' = 'full-compaction','full-compaction.delta-commits' = '3'").toString());
        Thread.sleep(current.nextInt(5000));
        checkChangelogTestResult(i);
    }

    private void testLookupChangelogProducerRandom(TableEnvironment tableEnvironment, int i, boolean z) throws Exception {
        ThreadLocalRandom current = ThreadLocalRandom.current();
        StringBuilder append = new StringBuilder().append("'bucket' = '4',");
        Object[] objArr = new Object[1];
        objArr[0] = current.nextBoolean() ? "512kb" : "1mb";
        testRandom(tableEnvironment, i, z, append.append(String.format("'write-buffer-size' = '%s',", objArr)).append("'changelog-producer' = 'lookup'").toString());
        Thread.sleep(current.nextInt(5000));
        checkChangelogTestResult(i);
    }

    private void testStandAloneFullCompactJobRandom(TableEnvironment tableEnvironment, int i, boolean z) throws Exception {
        ThreadLocalRandom current = ThreadLocalRandom.current();
        StringBuilder append = new StringBuilder().append("'bucket' = '4',");
        Object[] objArr = new Object[1];
        objArr[0] = current.nextBoolean() ? "512kb" : "1mb";
        testRandom(tableEnvironment, i, false, append.append(String.format("'write-buffer-size' = '%s',", objArr)).append("'changelog-producer' = 'full-compaction','full-compaction.delta-commits' = '3','write-only' = 'true'").toString());
        Thread.sleep(current.nextInt(2500));
        for (int i2 = z ? 2 : 1; i2 > 0; i2--) {
            StreamExecutionEnvironment createStreamExecutionEnvironment = createStreamExecutionEnvironment(current.nextInt(1900) + 100);
            createStreamExecutionEnvironment.setParallelism(2);
            new CompactAction(this.path, FlinkTestBase.CURRENT_DATABASE, "T").build(createStreamExecutionEnvironment);
            createStreamExecutionEnvironment.executeAsync();
        }
        Thread.sleep(current.nextInt(2500));
        checkChangelogTestResult(i);
    }

    private void testStandAloneLookupJobRandom(TableEnvironment tableEnvironment, int i, boolean z) throws Exception {
        ThreadLocalRandom current = ThreadLocalRandom.current();
        StringBuilder append = new StringBuilder().append("'bucket' = '4',");
        Object[] objArr = new Object[1];
        objArr[0] = current.nextBoolean() ? "512kb" : "1mb";
        testRandom(tableEnvironment, i, false, append.append(String.format("'write-buffer-size' = '%s',", objArr)).append("'changelog-producer' = 'lookup','write-only' = 'true'").toString());
        Thread.sleep(current.nextInt(2500));
        for (int i2 = z ? 2 : 1; i2 > 0; i2--) {
            StreamExecutionEnvironment createStreamExecutionEnvironment = createStreamExecutionEnvironment(current.nextInt(1900) + 100);
            createStreamExecutionEnvironment.setParallelism(2);
            new CompactAction(this.path, FlinkTestBase.CURRENT_DATABASE, "T").build(createStreamExecutionEnvironment);
            createStreamExecutionEnvironment.executeAsync();
        }
        Thread.sleep(current.nextInt(2500));
        checkChangelogTestResult(i);
    }

    private void checkChangelogTestResult(int i) throws Exception {
        TableEnvironment createStreamingTableEnvironment = createStreamingTableEnvironment(100);
        createStreamingTableEnvironment.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        createStreamingTableEnvironment.executeSql(createCatalogSql("testCatalog", this.path));
        createStreamingTableEnvironment.executeSql("USE CATALOG testCatalog");
        ResultChecker resultChecker = new ResultChecker(null);
        int i2 = 0;
        CloseableIterator collect = createStreamingTableEnvironment.executeSql("SELECT * FROM T").collect();
        Throwable th = null;
        while (collect.hasNext()) {
            try {
                try {
                    Row row = (Row) collect.next();
                    resultChecker.addChangelog(row);
                    if (((Long) row.getField(2)).longValue() >= 10000) {
                        i2++;
                        if (i2 == i * NUM_PARTS * NUM_KEYS) {
                            break;
                        }
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (collect != null) {
                    if (th != null) {
                        try {
                            collect.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        collect.close();
                    }
                }
                throw th2;
            }
        }
        if (collect != null) {
            if (0 != 0) {
                try {
                    collect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                collect.close();
            }
        }
        resultChecker.assertResult(i);
        checkBatchResult(i);
    }

    private List<TableResult> testRandom(TableEnvironment tableEnvironment, int i, boolean z, String str) throws Exception {
        String uuid = UUID.randomUUID().toString();
        String failingPath = FailingFileIO.getFailingPath(uuid, this.path);
        FailingFileIO.reset(uuid, 0, 1);
        tableEnvironment.executeSql(createCatalogSql("testCatalog", failingPath));
        tableEnvironment.executeSql("USE CATALOG testCatalog");
        tableEnvironment.executeSql("CREATE TABLE T(  pt STRING,  k INT,  v1 BIGINT,  v2 STRING,  PRIMARY KEY (pt, k) NOT ENFORCED) PARTITIONED BY (pt) WITH (" + str + ")");
        tableEnvironment.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        tableEnvironment.executeSql("CREATE TABLE `default_catalog`.`default_database`.`S` (  i INT) WITH (  'connector' = 'datagen',  'fields.i.kind' = 'sequence',  'fields.i.start' = '0',  'fields.i.end' = '10255',  'number-of-rows' = '10256',  'rows-per-second' = '" + (500 + ThreadLocalRandom.current().nextInt(500)) + "')").await();
        ArrayList arrayList = new ArrayList();
        if (z) {
            FailingFileIO.reset(uuid, 2, 10000);
        }
        for (int i2 = 0; i2 < i; i2++) {
            tableEnvironment.executeSql(String.format("CREATE TEMPORARY VIEW myView%d AS SELECT %s, %s, %s, %s FROM `default_catalog`.`default_database`.`S`", Integer.valueOf(i2), String.format("IF(i >= %d, CAST((i - %d) / %d AS STRING), CAST(CAST(FLOOR(RAND() * %d) AS INT) AS STRING)) AS pt", 10000, 10000, Integer.valueOf(NUM_KEYS), Integer.valueOf(NUM_PARTS)), String.format("IF(i >= %d, MOD(i - %d, %d), CAST(FLOOR(RAND() * %d) AS INT)) + %d AS k", 10000, 10000, Integer.valueOf(NUM_KEYS), Integer.valueOf(NUM_KEYS), Integer.valueOf(i2 * NUM_KEYS)), String.format("IF(i >= %d, i, CAST(FLOOR(RAND() * %d) AS BIGINT)) AS v1", 10000, Integer.valueOf(NUM_VALUES)), "CAST(i AS STRING) || '.str' AS v2"));
            int i3 = i2;
            arrayList.add((TableResult) FailingFileIO.retryArtificialException(() -> {
                return tableEnvironment.executeSql("INSERT INTO T /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM myView" + i3);
            }));
        }
        return arrayList;
    }

    private void checkBatchResult(int i) throws Exception {
        TableEnvironment createBatchTableEnvironment = createBatchTableEnvironment();
        createBatchTableEnvironment.executeSql(createCatalogSql("testCatalog", this.path));
        createBatchTableEnvironment.executeSql("USE CATALOG testCatalog");
        ResultChecker resultChecker = new ResultChecker(null);
        CloseableIterator collect = createBatchTableEnvironment.executeSql("SELECT * FROM T").collect();
        Throwable th = null;
        while (collect.hasNext()) {
            try {
                try {
                    resultChecker.addChangelog((Row) collect.next());
                } finally {
                }
            } catch (Throwable th2) {
                if (collect != null) {
                    if (th != null) {
                        try {
                            collect.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        collect.close();
                    }
                }
                throw th2;
            }
        }
        if (collect != null) {
            if (0 != 0) {
                try {
                    collect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                collect.close();
            }
        }
        resultChecker.assertResult(i);
    }
}
