/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.kafka;

import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaLogSinkProvider;
import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
import org.apache.flink.table.store.kafka.KafkaLogTestUtils;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;

public class KafkaLogITCase
extends KafkaTableTestBase {
    private final KafkaLogStoreFactory factory = KafkaLogTestUtils.discoverKafkaLogFactory();

    @Test
    public void testDropEmpty() {
        this.factory.onDropTable(KafkaLogTestUtils.testContext(this.getBootstrapServers(), CoreOptions.LogChangelogMode.AUTO, true), true);
    }

    @Test
    public void testUpsertTransactionKeyed() throws Exception {
        this.innerTest("UpsertTransactionKeyed", CoreOptions.LogChangelogMode.UPSERT, CoreOptions.LogConsistency.TRANSACTIONAL, true);
    }

    @Test
    public void testAllTransactionKeyed() throws Exception {
        this.innerTest("AllTransactionKeyed", CoreOptions.LogChangelogMode.ALL, CoreOptions.LogConsistency.TRANSACTIONAL, true);
    }

    @Test
    public void testUpsertEventualKeyed() throws Exception {
        this.innerTest("UpsertEventualKeyed", CoreOptions.LogChangelogMode.UPSERT, CoreOptions.LogConsistency.EVENTUAL, true);
    }

    @Test
    public void testAllEventualKeyed() throws Exception {
        this.innerTest("AllEventualKeyed", CoreOptions.LogChangelogMode.ALL, CoreOptions.LogConsistency.EVENTUAL, true);
    }

    @Test
    public void testAllTransactionNonKeyed() throws Exception {
        this.innerTest("AllTransactionNonKeyed", CoreOptions.LogChangelogMode.ALL, CoreOptions.LogConsistency.TRANSACTIONAL, false);
    }

    @Test
    public void testUpsertTransactionNonKeyed() {
        IllegalArgumentException exception = (IllegalArgumentException)org.junit.jupiter.api.Assertions.assertThrows(IllegalArgumentException.class, () -> this.innerTest("UpsertTransactionNonKeyed", CoreOptions.LogChangelogMode.UPSERT, CoreOptions.LogConsistency.TRANSACTIONAL, false));
        Assertions.assertThat((String)exception.getMessage()).isEqualTo("Can not use upsert changelog mode for non-pk table.");
    }

    @Test
    public void testUpsertEventualNonKeyed() {
        IllegalArgumentException exception = (IllegalArgumentException)org.junit.jupiter.api.Assertions.assertThrows(IllegalArgumentException.class, () -> this.innerTest("UpsertEventualNonKeyed", CoreOptions.LogChangelogMode.UPSERT, CoreOptions.LogConsistency.EVENTUAL, false));
        Assertions.assertThat((String)exception.getMessage()).isEqualTo("Can not use EVENTUAL consistency mode for non-pk table.");
    }

    @Test
    public void testAllEventualNonKeyed() {
        IllegalArgumentException exception = (IllegalArgumentException)org.junit.jupiter.api.Assertions.assertThrows(IllegalArgumentException.class, () -> this.innerTest("AllEventualNonKeyed", CoreOptions.LogChangelogMode.ALL, CoreOptions.LogConsistency.EVENTUAL, false));
        Assertions.assertThat((String)exception.getMessage()).isEqualTo("Can not use EVENTUAL consistency mode for non-pk table.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void innerTest(String name, CoreOptions.LogChangelogMode changelogMode, CoreOptions.LogConsistency consistency, boolean keyed) throws Exception {
        DynamicTableFactory.Context context = KafkaLogTestUtils.testContext(name, this.getBootstrapServers(), changelogMode, consistency, keyed);
        KafkaLogSinkProvider sinkProvider = this.factory.createSinkProvider(context, KafkaLogTestUtils.SINK_CONTEXT);
        this.factory.onCreateTable(context, 3, true);
        try {
            this.enableCheckpoint();
            this.env.fromElements((Object[])new SinkRecord[]{KafkaLogTestUtils.testRecord(true, 2, 1, 2, RowKind.DELETE), KafkaLogTestUtils.testRecord(true, 1, 3, 4, RowKind.INSERT), KafkaLogTestUtils.testRecord(true, 0, 5, 6, RowKind.INSERT), KafkaLogTestUtils.testRecord(true, 0, 7, 8, RowKind.INSERT)}).addSink((SinkFunction)sinkProvider.createSink());
            this.env.execute();
            List<RowData> records = this.collect((KafkaSource<RowData>)this.factory.createSourceProvider(context, KafkaLogTestUtils.SOURCE_CONTEXT, (int[][])null).createSource(null), 4);
            if (changelogMode == CoreOptions.LogChangelogMode.UPSERT) {
                this.assertRow(records.get(0), RowKind.DELETE, 1, null);
            } else {
                this.assertRow(records.get(0), RowKind.DELETE, 1, 2);
            }
            this.assertRow(records.get(1), RowKind.INSERT, 3, 4);
            this.assertRow(records.get(2), RowKind.INSERT, 5, 6);
            this.assertRow(records.get(3), RowKind.INSERT, 7, 8);
            records = this.collect((KafkaSource<RowData>)this.factory.createSourceProvider(context, KafkaLogTestUtils.SOURCE_CONTEXT, (int[][])new int[][]{{1}}).createSource(null), 4);
            if (changelogMode == CoreOptions.LogChangelogMode.UPSERT) {
                this.assertValue(records.get(0), RowKind.DELETE, null);
            } else {
                this.assertValue(records.get(0), RowKind.DELETE, 2);
            }
            this.assertValue(records.get(1), RowKind.INSERT, 4);
            this.assertValue(records.get(2), RowKind.INSERT, 6);
            this.assertValue(records.get(3), RowKind.INSERT, 8);
        }
        finally {
            this.factory.onDropTable(context, true);
        }
    }

    private List<RowData> collect(KafkaSource<RowData> source, int numRecord) throws Exception {
        List records = BlockingIterator.of((Iterator)this.env.fromSource(source, WatermarkStrategy.noWatermarks(), "source").executeAndCollect()).collectAndClose(numRecord);
        records.sort(Comparator.comparingInt(o -> o.getInt(0)));
        return records;
    }

    private void enableCheckpoint() {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, (Object)true);
        this.env.configure((ReadableConfig)configuration);
        this.env.enableCheckpointing(1000L);
    }

    private void assertRow(RowData row, RowKind rowKind, Integer k, Integer v) {
        Assert.assertEquals((Object)rowKind, (Object)row.getRowKind());
        Assert.assertEquals((Object)k, row.isNullAt(0) ? null : Integer.valueOf(row.getInt(0)));
        Assert.assertEquals((Object)v, row.isNullAt(1) ? null : Integer.valueOf(row.getInt(1)));
    }

    private void assertValue(RowData row, RowKind rowKind, Integer v) {
        Assert.assertEquals((Object)rowKind, (Object)row.getRowKind());
        Assert.assertEquals((Object)v, row.isNullAt(0) ? null : Integer.valueOf(row.getInt(0)));
    }
}

