package org.apache.flink.table.store.kafka;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/store/kafka/KafkaLogITCase.class */
public class KafkaLogITCase extends KafkaTableTestBase {
    private final KafkaLogStoreFactory factory = KafkaLogTestUtils.discoverKafkaLogFactory();

    @Test
    public void testDropEmpty() {
        this.factory.onDropTable(KafkaLogTestUtils.testContext(getBootstrapServers(), CoreOptions.LogChangelogMode.AUTO, true), true);
    }

    @Test
    public void testUpsertTransactionKeyed() throws Exception {
        innerTest("UpsertTransactionKeyed", CoreOptions.LogChangelogMode.UPSERT, CoreOptions.LogConsistency.TRANSACTIONAL, true);
    }

    @Test
    public void testAllTransactionKeyed() throws Exception {
        innerTest("AllTransactionKeyed", CoreOptions.LogChangelogMode.ALL, CoreOptions.LogConsistency.TRANSACTIONAL, true);
    }

    @Test
    public void testUpsertEventualKeyed() throws Exception {
        innerTest("UpsertEventualKeyed", CoreOptions.LogChangelogMode.UPSERT, CoreOptions.LogConsistency.EVENTUAL, true);
    }

    @Test
    public void testAllEventualKeyed() throws Exception {
        innerTest("AllEventualKeyed", CoreOptions.LogChangelogMode.ALL, CoreOptions.LogConsistency.EVENTUAL, true);
    }

    @Test
    public void testAllTransactionNonKeyed() throws Exception {
        innerTest("AllTransactionNonKeyed", CoreOptions.LogChangelogMode.ALL, CoreOptions.LogConsistency.TRANSACTIONAL, false);
    }

    @Test
    public void testUpsertTransactionNonKeyed() {
        Assertions.assertThat(((IllegalArgumentException) org.junit.jupiter.api.Assertions.assertThrows(IllegalArgumentException.class, () -> {
            innerTest("UpsertTransactionNonKeyed", CoreOptions.LogChangelogMode.UPSERT, CoreOptions.LogConsistency.TRANSACTIONAL, false);
        })).getMessage()).isEqualTo("Can not use upsert changelog mode for non-pk table.");
    }

    @Test
    public void testUpsertEventualNonKeyed() {
        Assertions.assertThat(((IllegalArgumentException) org.junit.jupiter.api.Assertions.assertThrows(IllegalArgumentException.class, () -> {
            innerTest("UpsertEventualNonKeyed", CoreOptions.LogChangelogMode.UPSERT, CoreOptions.LogConsistency.EVENTUAL, false);
        })).getMessage()).isEqualTo("Can not use EVENTUAL consistency mode for non-pk table.");
    }

    @Test
    public void testAllEventualNonKeyed() {
        Assertions.assertThat(((IllegalArgumentException) org.junit.jupiter.api.Assertions.assertThrows(IllegalArgumentException.class, () -> {
            innerTest("AllEventualNonKeyed", CoreOptions.LogChangelogMode.ALL, CoreOptions.LogConsistency.EVENTUAL, false);
        })).getMessage()).isEqualTo("Can not use EVENTUAL consistency mode for non-pk table.");
    }

    /* JADX WARN: Type inference failed for: r4v20, types: [int[], int[][]] */
    private void innerTest(String str, CoreOptions.LogChangelogMode logChangelogMode, CoreOptions.LogConsistency logConsistency, boolean z) throws Exception {
        DynamicTableFactory.Context testContext = KafkaLogTestUtils.testContext(str, getBootstrapServers(), logChangelogMode, logConsistency, z);
        KafkaLogSinkProvider createSinkProvider = this.factory.createSinkProvider(testContext, KafkaLogTestUtils.SINK_CONTEXT);
        this.factory.onCreateTable(testContext, 3, true);
        try {
            enableCheckpoint();
            this.env.fromElements(new SinkRecord[]{KafkaLogTestUtils.testRecord(true, 2, 1, 2, RowKind.DELETE), KafkaLogTestUtils.testRecord(true, 1, 3, 4, RowKind.INSERT), KafkaLogTestUtils.testRecord(true, 0, 5, 6, RowKind.INSERT), KafkaLogTestUtils.testRecord(true, 0, 7, 8, RowKind.INSERT)}).addSink(createSinkProvider.createSink());
            this.env.execute();
            List<RowData> collect = collect(this.factory.createSourceProvider(testContext, KafkaLogTestUtils.SOURCE_CONTEXT, (int[][]) null).createSource((Map) null), 4);
            if (logChangelogMode == CoreOptions.LogChangelogMode.UPSERT) {
                assertRow(collect.get(0), RowKind.DELETE, 1, null);
            } else {
                assertRow(collect.get(0), RowKind.DELETE, 1, 2);
            }
            assertRow(collect.get(1), RowKind.INSERT, 3, 4);
            assertRow(collect.get(2), RowKind.INSERT, 5, 6);
            assertRow(collect.get(3), RowKind.INSERT, 7, 8);
            List<RowData> collect2 = collect(this.factory.createSourceProvider(testContext, KafkaLogTestUtils.SOURCE_CONTEXT, (int[][]) new int[]{new int[]{1}}).createSource((Map) null), 4);
            if (logChangelogMode == CoreOptions.LogChangelogMode.UPSERT) {
                assertValue(collect2.get(0), RowKind.DELETE, null);
            } else {
                assertValue(collect2.get(0), RowKind.DELETE, 2);
            }
            assertValue(collect2.get(1), RowKind.INSERT, 4);
            assertValue(collect2.get(2), RowKind.INSERT, 6);
            assertValue(collect2.get(3), RowKind.INSERT, 8);
            this.factory.onDropTable(testContext, true);
        } catch (Throwable th) {
            this.factory.onDropTable(testContext, true);
            throw th;
        }
    }

    private List<RowData> collect(KafkaSource<RowData> kafkaSource, int i) throws Exception {
        List<RowData> collectAndClose = BlockingIterator.of(this.env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "source").executeAndCollect()).collectAndClose(i);
        collectAndClose.sort(Comparator.comparingInt(rowData -> {
            return rowData.getInt(0);
        }));
        return collectAndClose;
    }

    private void enableCheckpoint() {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
        this.env.configure(configuration);
        this.env.enableCheckpointing(1000L);
    }

    private void assertRow(RowData rowData, RowKind rowKind, Integer num, Integer num2) {
        Assert.assertEquals(rowKind, rowData.getRowKind());
        Assert.assertEquals(num, rowData.isNullAt(0) ? null : Integer.valueOf(rowData.getInt(0)));
        Assert.assertEquals(num2, rowData.isNullAt(1) ? null : Integer.valueOf(rowData.getInt(1)));
    }

    private void assertValue(RowData rowData, RowKind rowKind, Integer num) {
        Assert.assertEquals(rowKind, rowData.getRowKind());
        Assert.assertEquals(num, rowData.isNullAt(0) ? null : Integer.valueOf(rowData.getInt(0)));
    }
}
