package org.apache.paimon.flink.kafka;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.util.Collector;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.types.RowKind;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/kafka/KafkaLogSerializationTest.class */
public class KafkaLogSerializationTest {
    private static final String TOPIC = "my_topic";

    @Test
    public void testKeyed() throws Exception {
        checkKeyed(CoreOptions.LogChangelogMode.AUTO, 1, 3, 5);
        checkKeyed(CoreOptions.LogChangelogMode.UPSERT, 3, 6, 9);
        checkKeyed(CoreOptions.LogChangelogMode.ALL, 2, 5, 3);
    }

    @Test
    public void testNonKeyedUpsert() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            checkNonKeyed(CoreOptions.LogChangelogMode.UPSERT, 3, 6, 9);
        });
    }

    @Test
    public void testNonKeyed() throws Exception {
        checkNonKeyed(CoreOptions.LogChangelogMode.AUTO, 1, 3, 5);
        checkNonKeyed(CoreOptions.LogChangelogMode.ALL, 2, 5, 3);
    }

    private void checkKeyed(CoreOptions.LogChangelogMode logChangelogMode, int i, int i2, int i3) throws Exception {
        check(logChangelogMode, true, i, i2, i3, RowKind.INSERT);
        check(logChangelogMode, true, i, i2, i3, RowKind.UPDATE_BEFORE);
        check(logChangelogMode, true, i, i2, i3, RowKind.UPDATE_AFTER);
        check(logChangelogMode, true, i, i2, i3, RowKind.DELETE);
    }

    private void checkNonKeyed(CoreOptions.LogChangelogMode logChangelogMode, int i, int i2, int i3) throws Exception {
        check(logChangelogMode, false, i, i2, i3, RowKind.INSERT);
        check(logChangelogMode, false, i, i2, i3, RowKind.UPDATE_BEFORE);
        check(logChangelogMode, false, i, i2, i3, RowKind.UPDATE_AFTER);
        check(logChangelogMode, false, i, i2, i3, RowKind.DELETE);
    }

    private void check(CoreOptions.LogChangelogMode logChangelogMode, boolean z, int i, int i2, int i3, RowKind rowKind) throws Exception {
        KafkaLogSerializationSchema createTestSerializationSchema = createTestSerializationSchema(KafkaLogTestUtils.testContext("", logChangelogMode, z));
        createTestSerializationSchema.open((SerializationSchema.InitializationContext) null);
        KafkaRecordDeserializationSchema<RowData> createTestDeserializationSchema = createTestDeserializationSchema(KafkaLogTestUtils.testContext("", logChangelogMode, z));
        createTestDeserializationSchema.open((DeserializationSchema.InitializationContext) null);
        ProducerRecord<byte[], byte[]> serialize = createTestSerializationSchema.serialize(KafkaLogTestUtils.testRecord(z, i, i2, i3, rowKind), (Long) null);
        org.assertj.core.api.Assertions.assertThat(serialize.partition().intValue()).isEqualTo(i);
        final AtomicReference atomicReference = new AtomicReference();
        createTestDeserializationSchema.deserialize(toConsumerRecord(serialize), new Collector<RowData>() { // from class: org.apache.paimon.flink.kafka.KafkaLogSerializationTest.1
            public void collect(RowData rowData) {
                if (atomicReference.get() != null) {
                    throw new RuntimeException();
                }
                atomicReference.set(rowData);
            }

            public void close() {
            }
        });
        RowData rowData = (RowData) atomicReference.get();
        if (rowKind == RowKind.UPDATE_BEFORE) {
            org.assertj.core.api.Assertions.assertThat(rowData.getRowKind()).isEqualTo(org.apache.flink.types.RowKind.DELETE);
        } else if (rowKind == RowKind.UPDATE_AFTER) {
            org.assertj.core.api.Assertions.assertThat(rowData.getRowKind()).isEqualTo(org.apache.flink.types.RowKind.INSERT);
        } else {
            org.assertj.core.api.Assertions.assertThat(rowData.getRowKind()).isEqualTo(FlinkRowData.toFlinkRowKind(rowKind));
        }
        org.assertj.core.api.Assertions.assertThat(rowData.getInt(0)).isEqualTo(i2);
        if (rowData.getRowKind() == org.apache.flink.types.RowKind.INSERT || logChangelogMode == CoreOptions.LogChangelogMode.ALL || !z) {
            org.assertj.core.api.Assertions.assertThat(rowData.getInt(1)).isEqualTo(i3);
        } else {
            org.assertj.core.api.Assertions.assertThat(rowData.isNullAt(1)).isTrue();
        }
    }

    private ConsumerRecord<byte[], byte[]> toConsumerRecord(ProducerRecord<byte[], byte[]> producerRecord) {
        return new ConsumerRecord<>(TOPIC, producerRecord.partition().intValue(), 0L, producerRecord.key(), producerRecord.value());
    }

    private static KafkaLogSerializationSchema createTestSerializationSchema(DynamicTableFactory.Context context) {
        return KafkaLogTestUtils.discoverKafkaLogFactory().createSinkProvider(context, KafkaLogTestUtils.SINK_CONTEXT).createSerializationSchema();
    }

    private static KafkaRecordDeserializationSchema<RowData> createTestDeserializationSchema(DynamicTableFactory.Context context) {
        return KafkaLogTestUtils.discoverKafkaLogFactory().createSourceProvider(context, KafkaLogTestUtils.SOURCE_CONTEXT, (int[][]) null).createDeserializationSchema();
    }
}
