/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.kafka;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.kafka.KafkaLogSerializationSchema;
import org.apache.flink.table.store.kafka.KafkaLogTestUtils;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class KafkaLogSerializationTest {
    private static final String TOPIC = "my_topic";

    @Test
    public void testKeyed() throws Exception {
        this.checkKeyed(CoreOptions.LogChangelogMode.AUTO, 1, 3, 5);
        this.checkKeyed(CoreOptions.LogChangelogMode.UPSERT, 3, 6, 9);
        this.checkKeyed(CoreOptions.LogChangelogMode.ALL, 2, 5, 3);
    }

    @Test
    public void testNonKeyedUpsert() {
        org.junit.jupiter.api.Assertions.assertThrows(IllegalArgumentException.class, () -> this.checkNonKeyed(CoreOptions.LogChangelogMode.UPSERT, 3, 6, 9));
    }

    @Test
    public void testNonKeyed() throws Exception {
        this.checkNonKeyed(CoreOptions.LogChangelogMode.AUTO, 1, 3, 5);
        this.checkNonKeyed(CoreOptions.LogChangelogMode.ALL, 2, 5, 3);
    }

    private void checkKeyed(CoreOptions.LogChangelogMode mode, int bucket, int key, int value) throws Exception {
        this.check(mode, true, bucket, key, value, RowKind.INSERT);
        this.check(mode, true, bucket, key, value, RowKind.UPDATE_BEFORE);
        this.check(mode, true, bucket, key, value, RowKind.UPDATE_AFTER);
        this.check(mode, true, bucket, key, value, RowKind.DELETE);
    }

    private void checkNonKeyed(CoreOptions.LogChangelogMode mode, int bucket, int key, int value) throws Exception {
        this.check(mode, false, bucket, key, value, RowKind.INSERT);
        this.check(mode, false, bucket, key, value, RowKind.UPDATE_BEFORE);
        this.check(mode, false, bucket, key, value, RowKind.UPDATE_AFTER);
        this.check(mode, false, bucket, key, value, RowKind.DELETE);
    }

    private void check(CoreOptions.LogChangelogMode mode, boolean keyed, int bucket, int key, int value, RowKind rowKind) throws Exception {
        KafkaLogSerializationSchema serializer = KafkaLogSerializationTest.createTestSerializationSchema(KafkaLogTestUtils.testContext("", mode, keyed));
        serializer.open(null);
        KafkaRecordDeserializationSchema<RowData> deserializer = KafkaLogSerializationTest.createTestDeserializationSchema(KafkaLogTestUtils.testContext("", mode, keyed));
        deserializer.open(null);
        SinkRecord input = KafkaLogTestUtils.testRecord(keyed, bucket, key, value, rowKind);
        ProducerRecord record = serializer.serialize(input, null);
        Assertions.assertThat((int)record.partition()).isEqualTo(bucket);
        final AtomicReference rowReference = new AtomicReference();
        deserializer.deserialize(this.toConsumerRecord((ProducerRecord<byte[], byte[]>)record), (Collector)new Collector<RowData>(){

            public void collect(RowData record) {
                if (rowReference.get() != null) {
                    throw new RuntimeException();
                }
                rowReference.set(record);
            }

            public void close() {
            }
        });
        RowData row = (RowData)rowReference.get();
        if (rowKind == RowKind.UPDATE_BEFORE) {
            Assertions.assertThat((Comparable)row.getRowKind()).isEqualTo((Object)RowKind.DELETE);
        } else if (rowKind == RowKind.UPDATE_AFTER) {
            Assertions.assertThat((Comparable)row.getRowKind()).isEqualTo((Object)RowKind.INSERT);
        } else {
            Assertions.assertThat((Comparable)row.getRowKind()).isEqualTo((Object)rowKind);
        }
        Assertions.assertThat((int)row.getInt(0)).isEqualTo(key);
        if (row.getRowKind() == RowKind.INSERT || mode == CoreOptions.LogChangelogMode.ALL || !keyed) {
            Assertions.assertThat((int)row.getInt(1)).isEqualTo(value);
        } else {
            Assertions.assertThat((boolean)row.isNullAt(1)).isTrue();
        }
    }

    private ConsumerRecord<byte[], byte[]> toConsumerRecord(ProducerRecord<byte[], byte[]> record) {
        return new ConsumerRecord(TOPIC, record.partition().intValue(), 0L, record.key(), record.value());
    }

    private static KafkaLogSerializationSchema createTestSerializationSchema(DynamicTableFactory.Context context) {
        return KafkaLogTestUtils.discoverKafkaLogFactory().createSinkProvider(context, KafkaLogTestUtils.SINK_CONTEXT).createSerializationSchema();
    }

    private static KafkaRecordDeserializationSchema<RowData> createTestDeserializationSchema(DynamicTableFactory.Context context) {
        return KafkaLogTestUtils.discoverKafkaLogFactory().createSourceProvider(context, KafkaLogTestUtils.SOURCE_CONTEXT, (int[][])null).createDeserializationSchema();
    }
}

