package org.apache.flink.cdc.connectors.starrocks.sink;

import com.starrocks.connector.flink.table.data.StarRocksRowData;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.starrocks.connector.flink.table.sink.v2.DefaultStarRocksSinkContext;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.BooleanType;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DateType;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.FloatType;
import org.apache.flink.cdc.common.types.IntType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.SmallIntType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.VarCharType;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.util.SimpleUserCodeClassLoader;
import org.apache.flink.util.UserCodeClassLoader;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.class */
public class EventRecordSerializationSchemaTest {
    private EventRecordSerializationSchema serializer;
    private ObjectMapper objectMapper;

    /* loaded from: input_file:org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest$MockInitContext.class */
    private static class MockInitContext implements Sink.InitContext {
        private MockInitContext() {
        }

        public UserCodeClassLoader getUserCodeClassLoader() {
            return SimpleUserCodeClassLoader.create(MockInitContext.class.getClassLoader());
        }

        public MailboxExecutor getMailboxExecutor() {
            throw new UnsupportedOperationException();
        }

        public ProcessingTimeService getProcessingTimeService() {
            throw new UnsupportedOperationException();
        }

        public int getSubtaskId() {
            throw new UnsupportedOperationException();
        }

        public int getNumberOfParallelSubtasks() {
            throw new UnsupportedOperationException();
        }

        public int getAttemptNumber() {
            throw new UnsupportedOperationException();
        }

        public SinkWriterMetricGroup metricGroup() {
            throw new UnsupportedOperationException();
        }

        public OptionalLong getRestoredCheckpointId() {
            throw new UnsupportedOperationException();
        }

        public SerializationSchema.InitializationContext asSerializationSchemaInitializationContext() {
            throw new UnsupportedOperationException();
        }

        public boolean isObjectReuseEnabled() {
            throw new UnsupportedOperationException();
        }

        public <IN> TypeSerializer<IN> createInputSerializer() {
            throw new UnsupportedOperationException();
        }

        public JobID getJobId() {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest$MockInitializationContext.class */
    private static class MockInitializationContext implements SerializationSchema.InitializationContext {
        private MockInitializationContext() {
        }

        public MetricGroup getMetricGroup() {
            return new UnregisteredMetricsGroup();
        }

        public UserCodeClassLoader getUserCodeClassLoader() {
            return SimpleUserCodeClassLoader.create(MockInitializationContext.class.getClassLoader());
        }
    }

    @Before
    public void setup() {
        this.serializer = new EventRecordSerializationSchema(ZoneId.of("+08"));
        this.serializer.open(new MockInitializationContext(), new DefaultStarRocksSinkContext(new MockInitContext(), new StarRocksSinkOptions(new Configuration(), new HashMap())));
        this.objectMapper = new ObjectMapper();
    }

    @After
    public void teardown() {
        this.serializer.close();
    }

    @Test
    public void testMixedSchemaAndDataChanges() throws Exception {
        TableId parse = TableId.parse("test.tbl1");
        Schema build = Schema.newBuilder().physicalColumn("col1", new IntType()).physicalColumn("col2", new BooleanType()).physicalColumn("col3", new TimestampType()).primaryKey(new String[]{"col1"}).build();
        Assert.assertNull(this.serializer.serialize(new CreateTableEvent(parse, build)));
        BinaryRecordDataGenerator binaryRecordDataGenerator = new BinaryRecordDataGenerator((DataType[]) build.getColumnDataTypes().toArray(new DataType[0]));
        verifySerializeResult(parse, "{\"col1\":1,\"col2\":true,\"col3\":\"2023-11-27 18:00:00\",\"__op\":0}", this.serializer.serialize(DataChangeEvent.insertEvent(parse, binaryRecordDataGenerator.generate(new Object[]{1, true, TimestampData.fromTimestamp(Timestamp.valueOf("2023-11-27 18:00:00"))}))));
        verifySerializeResult(parse, "{\"col1\":2,\"col2\":false,\"col3\":\"2023-11-27 19:00:00\",\"__op\":1}", this.serializer.serialize(DataChangeEvent.deleteEvent(parse, binaryRecordDataGenerator.generate(new Object[]{2, false, TimestampData.fromTimestamp(Timestamp.valueOf("2023-11-27 19:00:00"))}))));
        verifySerializeResult(parse, "{\"col1\":3,\"col2\":true,\"col3\":\"2023-11-27 21:00:00\",\"__op\":0}", this.serializer.serialize(DataChangeEvent.updateEvent(parse, binaryRecordDataGenerator.generate(new Object[]{3, false, TimestampData.fromTimestamp(Timestamp.valueOf("2023-11-27 20:00:00"))}), binaryRecordDataGenerator.generate(new Object[]{3, true, TimestampData.fromTimestamp(Timestamp.valueOf("2023-11-27 21:00:00"))}))));
        TableId parse2 = TableId.parse("test.tbl2");
        Schema build2 = Schema.newBuilder().physicalColumn("col1", new DateType()).physicalColumn("col2", new FloatType()).physicalColumn("col3", new VarCharType(20)).primaryKey(new String[]{"col1"}).build();
        Assert.assertNull(this.serializer.serialize(new CreateTableEvent(parse2, build2)));
        verifySerializeResult(parse2, "{\"col1\":\"2023-11-27\",\"col2\":3.4,\"col3\":\"insert table2\",\"__op\":0}", this.serializer.serialize(DataChangeEvent.insertEvent(parse2, new BinaryRecordDataGenerator((DataType[]) build2.getColumnDataTypes().toArray(new DataType[0])).generate(new Object[]{Integer.valueOf((int) LocalDate.of(2023, 11, 27).toEpochDay()), Float.valueOf(3.4f), BinaryStringData.fromString("insert table2")}))));
        AddColumnEvent addColumnEvent = new AddColumnEvent(parse, Arrays.asList(new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("col4", new DecimalType(20, 5))), new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("col5", new SmallIntType())), new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("col6", new LocalZonedTimestampType()))));
        BinaryRecordDataGenerator binaryRecordDataGenerator2 = new BinaryRecordDataGenerator((DataType[]) SchemaUtils.applySchemaChangeEvent(build, addColumnEvent).getColumnDataTypes().toArray(new DataType[0]));
        Assert.assertNull(this.serializer.serialize(addColumnEvent));
        verifySerializeResult(parse, "{\"col1\":4,\"col2\":true,\"col3\":\"2023-11-27 21:00:00\",\"col4\":83.23,\"col5\":9,\"col6\":\"2023-11-27 19:00:00\",\"__op\":1}", (StarRocksRowData) Objects.requireNonNull(this.serializer.serialize(DataChangeEvent.deleteEvent(parse, binaryRecordDataGenerator2.generate(new Object[]{4, true, TimestampData.fromTimestamp(Timestamp.valueOf("2023-11-27 21:00:00")), DecimalData.fromBigDecimal(new BigDecimal("83.23"), 20, 5), (short) 9, LocalZonedTimestampData.fromInstant(LocalDateTime.of(2023, 11, 27, 21, 0, 0).toInstant(ZoneOffset.of("+10")))})))));
        DropColumnEvent dropColumnEvent = new DropColumnEvent(parse2, Arrays.asList("col2", "col3"));
        BinaryRecordDataGenerator binaryRecordDataGenerator3 = new BinaryRecordDataGenerator((DataType[]) SchemaUtils.applySchemaChangeEvent(build2, dropColumnEvent).getColumnDataTypes().toArray(new DataType[0]));
        Assert.assertNull(this.serializer.serialize(dropColumnEvent));
        verifySerializeResult(parse2, "{\"col1\":\"2023-11-28\",\"__op\":0}", (StarRocksRowData) Objects.requireNonNull(this.serializer.serialize(DataChangeEvent.insertEvent(parse2, binaryRecordDataGenerator3.generate(new Object[]{Integer.valueOf((int) LocalDate.of(2023, 11, 28).toEpochDay())})))));
    }

    private void verifySerializeResult(TableId tableId, String str, StarRocksRowData starRocksRowData) throws Exception {
        Assert.assertEquals(tableId.getSchemaName(), starRocksRowData.getDatabase());
        Assert.assertEquals(tableId.getTableName(), starRocksRowData.getTable());
        Assert.assertEquals((SortedMap) this.objectMapper.readValue(str, new TypeReference<TreeMap<String, Object>>() { // from class: org.apache.flink.cdc.connectors.starrocks.sink.EventRecordSerializationSchemaTest.1
        }), starRocksRowData.getRow() == null ? null : (SortedMap) this.objectMapper.readValue(starRocksRowData.getRow(), new TypeReference<TreeMap<String, Object>>() { // from class: org.apache.flink.cdc.connectors.starrocks.sink.EventRecordSerializationSchemaTest.2
        }));
    }
}
