package org.apache.paimon.flink.action;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/action/ConsumerActionITCase.class */
public class ConsumerActionITCase extends ActionITCaseBase {
    @Test
    public void testResetConsumer() throws Exception {
        ReadWriteTableTestUtil.init(this.warehouse);
        FileStoreTable createFileStoreTable = createFileStoreTable(RowType.of(new DataType[]{DataTypes.BIGINT(), DataTypes.STRING()}, new String[]{"pk1", "col1"}), Collections.emptyList(), Collections.singletonList("pk1"), Collections.emptyMap());
        StreamWriteBuilder withCommitUser = createFileStoreTable.newStreamWriteBuilder().withCommitUser(this.commitUser);
        this.write = withCommitUser.newWrite();
        this.commit = withCommitUser.newCommit();
        writeData(rowData(1L, BinaryString.fromString("Hi")));
        writeData(rowData(2L, BinaryString.fromString("Hello")));
        writeData(rowData(3L, BinaryString.fromString("Paimon")));
        ReadWriteTableTestUtil.testStreamingRead("SELECT * FROM `" + this.tableName + "` /*+ OPTIONS('consumer-id'='myid') */", Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{1L, "Hi"}), TestValuesTableFactory.changelogRow("+I", new Object[]{2L, "Hello"}), TestValuesTableFactory.changelogRow("+I", new Object[]{3L, "Paimon"}))).close();
        ConsumerManager consumerManager = new ConsumerManager(createFileStoreTable.fileIO(), createFileStoreTable.location());
        Optional consumer = consumerManager.consumer("myid");
        Assertions.assertThat(consumer).isPresent();
        Assertions.assertThat(((Consumer) consumer.get()).nextSnapshot()).isEqualTo(4L);
        List<String> asList = Arrays.asList("reset_consumer", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName, "--consumer_id", "myid", "--next_snapshot", "1");
        if (ThreadLocalRandom.current().nextBoolean()) {
            createAction(ResetConsumerAction.class, asList).run();
        } else {
            callProcedure(String.format("CALL sys.reset_consumer('%s.%s', 'myid', 1)", this.database, this.tableName));
        }
        Optional consumer2 = consumerManager.consumer("myid");
        Assertions.assertThat(consumer2).isPresent();
        Assertions.assertThat(((Consumer) consumer2.get()).nextSnapshot()).isEqualTo(1L);
        if (ThreadLocalRandom.current().nextBoolean()) {
            createAction(ResetConsumerAction.class, asList.subList(0, 9)).run();
        } else {
            callProcedure(String.format("CALL sys.reset_consumer('%s.%s', 'myid')", this.database, this.tableName));
        }
        Assertions.assertThat(consumerManager.consumer("myid")).isNotPresent();
    }
}
