package org.apache.flink.connector.jdbc.sink.writer;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.connections.xa.SimpleXaConnectionProvider;
import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable;
import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/connector/jdbc/sink/writer/ExactlyOnceJdbcWriterTest.class */
class ExactlyOnceJdbcWriterTest extends BaseJdbcWriterTest {
    ExactlyOnceJdbcWriterTest() {
    }

    @Override // org.apache.flink.connector.jdbc.sink.writer.BaseJdbcWriterTest
    protected JdbcExecutionOptions getExecutionOptions() {
        return JdbcExecutionOptions.builder().withMaxRetries(0).build();
    }

    @Override // org.apache.flink.connector.jdbc.sink.writer.BaseJdbcWriterTest
    protected JdbcExactlyOnceOptions getExactlyOnceOptions() {
        return JdbcExactlyOnceOptions.defaults();
    }

    @Override // org.apache.flink.connector.jdbc.sink.writer.BaseJdbcWriterTest
    protected DeliveryGuarantee getDeliveryGuarantee() {
        return DeliveryGuarantee.EXACTLY_ONCE;
    }

    @Override // org.apache.flink.connector.jdbc.sink.writer.BaseJdbcWriterTest
    protected JdbcConnectionProvider getConnectionProvider() {
        return SimpleXaConnectionProvider.from(() -> {
            return getMetadata().buildXaDataSource();
        }, getExactlyOnceOptions().getTimeoutSec());
    }

    @Test
    void testEmptyCheckpoint() throws Exception {
        checkPreCommitWithSnapshot(1L, false);
    }

    @Test
    void testCheckpoint() throws Exception {
        Iterator<BooksTable.BookEntry> it = BOOKS.iterator();
        while (it.hasNext()) {
            this.sinkWriter.write(it.next(), this.writerContext);
        }
        checkPreCommitWithSnapshot(1L, true);
        Assertions.assertThat(TEST_TABLE.selectAllTable(getMetadata().getConnection())).isEqualTo(BOOKS);
    }

    @Test
    void testMultipleCheckpoints() throws Exception {
        List<BooksTable.BookEntry> subList = BOOKS.subList(0, BOOKS.size() / 2);
        List<BooksTable.BookEntry> subList2 = BOOKS.subList(BOOKS.size() / 2, BOOKS.size());
        checkPreCommitWithSnapshot(1L, false);
        Iterator<BooksTable.BookEntry> it = subList.iterator();
        while (it.hasNext()) {
            this.sinkWriter.write(it.next(), this.writerContext);
        }
        checkPreCommitWithSnapshot(2L, true);
        Assertions.assertThat(TEST_TABLE.selectAllTable(getMetadata().getConnection())).isEqualTo(subList);
        Iterator<BooksTable.BookEntry> it2 = subList2.iterator();
        while (it2.hasNext()) {
            this.sinkWriter.write(it2.next(), this.writerContext);
        }
        checkPreCommitWithSnapshot(3L, true);
        checkPreCommitWithSnapshot(4L, false);
        Assertions.assertThat(TEST_TABLE.selectAllTable(getMetadata().getConnection())).isEqualTo(BOOKS);
    }

    protected void checkPreCommitWithSnapshot(long j, boolean z) throws Exception {
        this.sinkWriter.flush(false);
        Collection prepareCommit = this.sinkWriter.prepareCommit();
        if (z) {
            Assertions.assertThat(prepareCommit.size()).isEqualTo(1);
            checkCommitable((JdbcCommitable) prepareCommit.iterator().next(), withBranch(j - 1));
        } else {
            Assertions.assertThat(prepareCommit.size()).isEqualTo(0);
        }
        List snapshotState = this.sinkWriter.snapshotState(j);
        Assertions.assertThat(snapshotState).hasSize(1);
        checkSnapshot((JdbcWriterState) snapshotState.get(0), Collections.emptyList(), Collections.singletonList(withBranch(j)));
    }
}
