/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar.internal;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.streaming.connectors.pulsar.internal.DateTimeUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.IncompatibleSchemaException;
import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.SimpleSchemaTranslator;
import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.FieldsDataType;
import org.apache.flink.table.types.KeyValueDataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.shade.org.apache.avro.Conversions;
import org.apache.pulsar.shade.org.apache.avro.LogicalTypes;
import org.apache.pulsar.shade.org.apache.avro.Schema;
import org.apache.pulsar.shade.org.apache.avro.generic.GenericData;
import org.apache.pulsar.shade.org.apache.avro.util.Utf8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarSerializer {
    private static final Logger log = LoggerFactory.getLogger(PulsarSerializer.class);
    private final DataType flinkType;
    private final boolean nullable;
    private final Conversions.DecimalConversion decimalConversion = new Conversions.DecimalConversion();
    private final Schema rootAvroType;
    private final Function<Object, Object> converter;

    public PulsarSerializer(DataType flinkType, boolean nullable) {
        this.flinkType = flinkType;
        this.nullable = nullable;
        try {
            Function<Object, Object> baseConverter;
            this.rootAvroType = SimpleSchemaTranslator.sqlType2AvroSchema(flinkType);
            Schema actualAvroType = this.resolveNullableType(this.rootAvroType, nullable);
            if (flinkType instanceof FieldsDataType) {
                FieldsDataType st = (FieldsDataType)flinkType;
                baseConverter = this.newStructConverter(st, actualAvroType);
            } else {
                BiFunction<PositionedGetter, Integer, Object> converter = this.singleValueConverter(flinkType, actualAvroType);
                baseConverter = data -> converter.apply(new PositionedGetter((Row)data), 0);
            }
            this.converter = nullable ? data -> {
                if (data == null) {
                    return null;
                }
                return baseConverter.apply(data);
            } : baseConverter;
        }
        catch (IncompatibleSchemaException e) {
            log.error("Failed to create serializer while converting flink type to avro type");
            throw new IllegalStateException(e);
        }
    }

    public Object serialize(Object flinkData) {
        return this.converter.apply(flinkData);
    }

    private BiFunction<PositionedGetter, Integer, Object> singleValueConverter(DataType dataType, Schema avroType) throws IncompatibleSchemaException {
        LogicalTypeRoot tpe = dataType.getLogicalType().getTypeRoot();
        Schema.Type atpe = avroType.getType();
        if (tpe == LogicalTypeRoot.NULL && atpe == Schema.Type.NULL) {
            return (getter, ordinal) -> null;
        }
        if (tpe == LogicalTypeRoot.BOOLEAN && atpe == Schema.Type.BOOLEAN || tpe == LogicalTypeRoot.TINYINT && atpe == Schema.Type.INT || tpe == LogicalTypeRoot.SMALLINT && atpe == Schema.Type.INT || tpe == LogicalTypeRoot.INTEGER && atpe == Schema.Type.INT || tpe == LogicalTypeRoot.BIGINT && atpe == Schema.Type.LONG || tpe == LogicalTypeRoot.FLOAT && atpe == Schema.Type.FLOAT || tpe == LogicalTypeRoot.DOUBLE && atpe == Schema.Type.DOUBLE || tpe == LogicalTypeRoot.VARCHAR && atpe == Schema.Type.STRING || tpe == LogicalTypeRoot.VARBINARY && atpe == Schema.Type.BYTES) {
            return (getter, ordinal) -> getter.getField((int)ordinal);
        }
        if (tpe == LogicalTypeRoot.DATE && atpe == Schema.Type.INT) {
            return (getter, ordinal) -> (LocalDate)getter.getField((int)ordinal);
        }
        if (tpe == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE && atpe == Schema.Type.LONG) {
            org.apache.pulsar.shade.org.apache.avro.LogicalType altpe = avroType.getLogicalType();
            if (altpe instanceof LogicalTypes.TimestampMillis || altpe instanceof LogicalTypes.TimestampMicros) {
                return (getter, ordinal) -> (LocalDateTime)getter.getField((int)ordinal);
            }
            throw new IncompatibleSchemaException("Cannot convert flink timestamp to avro logical type " + altpe.toString());
        }
        throw new IncompatibleSchemaException(String.format("Cannot convert flink type %s to avro type %s", dataType.toString(), avroType.toString(true)));
    }

    private Function<Object, Object> newStructConverter(FieldsDataType dataType, Schema avroStruct) throws IncompatibleSchemaException {
        if (avroStruct.getType() != Schema.Type.RECORD || avroStruct.getFields().size() != dataType.getChildren().size()) {
            throw new IncompatibleSchemaException(String.format("Cannot convert Flink type %s to Avro type %s.", dataType.toString(), avroStruct.toString(true)));
        }
        RowType rowType = (RowType)dataType.getLogicalType();
        List fields = rowType.getFields();
        ArrayList<BiFunction<PositionedGetter, Integer, Object>> fieldConverters = new ArrayList<BiFunction<PositionedGetter, Integer, Object>>();
        for (int i = 0; i < fields.size(); ++i) {
            LogicalType logicalType = rowType.getTypeAt(i);
            DataType dt = TypeConversions.fromLogicalToDataType((LogicalType)logicalType);
            Schema.Field at = (Schema.Field)avroStruct.getFields().get(i);
            fieldConverters.add(this.newConverter(dt, this.resolveNullableType(at.schema(), dt.getLogicalType().isNullable())));
        }
        int numFields = dataType.getChildren().size();
        return row -> {
            GenericSchema<GenericRecord> pSchema = SchemaUtils.avroSchema2PulsarSchema(avroStruct);
            GenericRecordBuilder builder = pSchema.newRecordBuilder();
            Row rowX = (Row)row;
            for (int i = 0; i < numFields; ++i) {
                if (rowX.getField(i) == null) {
                    builder.set((Field)pSchema.getFields().get(i), null);
                    continue;
                }
                builder.set((Field)pSchema.getFields().get(i), ((BiFunction)fieldConverters.get(i)).apply(new PositionedGetter(rowX), i));
            }
            return (GenericAvroRecord)builder.build();
        };
    }

    private BiFunction<PositionedGetter, Integer, Object> newConverter(DataType dataType, Schema avroType) throws IncompatibleSchemaException {
        LogicalTypeRoot tpe = dataType.getLogicalType().getTypeRoot();
        Schema.Type atpe = avroType.getType();
        if (tpe == LogicalTypeRoot.NULL && atpe == Schema.Type.NULL) {
            return (getter, ordinal) -> null;
        }
        if (tpe == LogicalTypeRoot.BOOLEAN && atpe == Schema.Type.BOOLEAN || tpe == LogicalTypeRoot.TINYINT && atpe == Schema.Type.INT || tpe == LogicalTypeRoot.SMALLINT && atpe == Schema.Type.INT || tpe == LogicalTypeRoot.INTEGER && atpe == Schema.Type.INT || tpe == LogicalTypeRoot.BIGINT && atpe == Schema.Type.LONG || tpe == LogicalTypeRoot.FLOAT && atpe == Schema.Type.FLOAT || tpe == LogicalTypeRoot.DOUBLE && atpe == Schema.Type.DOUBLE || tpe == LogicalTypeRoot.VARBINARY && atpe == Schema.Type.BYTES) {
            return (getter, ordinal) -> getter.getField((int)ordinal);
        }
        if (tpe == LogicalTypeRoot.DECIMAL && (atpe == Schema.Type.FIXED || atpe == Schema.Type.BYTES)) {
            DecimalType d = (DecimalType)dataType.getLogicalType();
            if (avroType.getLogicalType() == LogicalTypes.decimal((int)d.getPrecision(), (int)d.getScale())) {
                return (getter, ordinal) -> {
                    BigDecimal decimal = (BigDecimal)getter.getField((int)ordinal);
                    return this.decimalConversion.toFixed(decimal, avroType, (org.apache.pulsar.shade.org.apache.avro.LogicalType)LogicalTypes.decimal((int)d.getPrecision(), (int)d.getScale()));
                };
            }
            throw new IncompatibleSchemaException("Cannot convert flink decimal type to Avro logical type");
        }
        if (tpe == LogicalTypeRoot.BIGINT && atpe == Schema.Type.BYTES) {
            return (getter, ordinal) -> ByteBuffer.wrap((byte[])getter.getField((int)ordinal));
        }
        if (tpe == LogicalTypeRoot.DATE && atpe == Schema.Type.INT) {
            return (getter, ordinal) -> ((LocalDate)getter.getField((int)ordinal)).toEpochDay();
        }
        if (tpe == LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE && atpe == Schema.Type.LONG) {
            org.apache.pulsar.shade.org.apache.avro.LogicalType altpe = avroType.getLogicalType();
            if (altpe instanceof LogicalTypes.TimestampMillis) {
                return (getter, ordinal) -> DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf((LocalDateTime)getter.getField((int)ordinal))) / 1000L;
            }
            if (altpe instanceof LogicalTypes.TimestampMicros) {
                return (getter, ordinal) -> DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf((LocalDateTime)getter.getField((int)ordinal)));
            }
            throw new IncompatibleSchemaException("Cannot convert flink timestamp to avro logical type " + altpe.toString());
        }
        if (tpe == LogicalTypeRoot.VARCHAR && atpe == Schema.Type.STRING) {
            return (getter, ordinal) -> new Utf8((String)getter.getField((int)ordinal));
        }
        if (tpe == LogicalTypeRoot.VARCHAR && atpe == Schema.Type.ENUM) {
            HashSet enumSymbols = new HashSet(avroType.getEnumSymbols());
            return (getter, ordinal) -> {
                String data = (String)getter.getField((int)ordinal);
                if (!enumSymbols.contains(data)) {
                    throw new IllegalArgumentException(String.format("Cannot write %s since it's not defined in enum %s", data, String.join((CharSequence)", ", enumSymbols)));
                }
                return new GenericData.EnumSymbol(avroType, data);
            };
        }
        if (tpe == LogicalTypeRoot.ARRAY && atpe == Schema.Type.ARRAY && dataType instanceof CollectionDataType) {
            DataType et = ((CollectionDataType)dataType).getElementDataType();
            boolean containsNull = et.getLogicalType().isNullable();
            BiFunction<PositionedGetter, Integer, Object> elementConverter = this.newConverter(et, this.resolveNullableType(avroType.getElementType(), containsNull));
            return (getter, ordinal) -> {
                Object[] arrayData = (Object[])getter.getField((int)ordinal);
                int len = arrayData.length;
                Object[] result = new Object[len];
                for (int i = 0; i < len; ++i) {
                    result[i] = containsNull && arrayData[i] == null ? null : elementConverter.apply(new PositionedGetter(arrayData), i);
                }
                return Arrays.asList(result);
            };
        }
        if (tpe == LogicalTypeRoot.MAP && atpe == Schema.Type.MAP && ((KeyValueDataType)dataType).getKeyDataType().getLogicalType().getTypeRoot() == LogicalTypeRoot.VARCHAR) {
            return (getter, ordinal) -> getter.getField((int)ordinal);
        }
        if (tpe == LogicalTypeRoot.ROW && atpe == Schema.Type.RECORD) {
            FieldsDataType st = (FieldsDataType)dataType;
            Function<Object, Object> structConverter = this.newStructConverter(st, avroType);
            return (getter, ordinal) -> ((GenericAvroRecord)structConverter.apply(getter.getField((int)ordinal))).getAvroRecord();
        }
        throw new IncompatibleSchemaException(String.format("Cannot convert flink type %s to avro type %s", dataType.toString(), avroType.toString(true)));
    }

    private List<Field> getFields(Schema aschema) {
        return aschema.getFields().stream().map(f -> new Field(f.name(), f.pos())).collect(Collectors.toList());
    }

    private Schema resolveNullableType(Schema avroType, boolean nullable) {
        if (nullable && avroType.getType() != Schema.Type.NULL) {
            List fields = avroType.getTypes();
            assert (fields.size() == 2);
            List actualType = fields.stream().filter(f -> f.getType() != Schema.Type.NULL).collect(Collectors.toList());
            assert (actualType.size() == 1);
            return (Schema)actualType.get(0);
        }
        return avroType;
    }

    public static class PositionedGetter {
        private final Object[] array;
        private final Row row;

        public PositionedGetter(Object[] array, Row row) {
            this.array = array;
            this.row = row;
        }

        public PositionedGetter(Row row) {
            this(null, row);
        }

        public PositionedGetter(Object[] array) {
            this(array, null);
        }

        public Object getField(int i) {
            if (this.array != null) {
                return this.array[i];
            }
            return this.row.getField(i);
        }
    }
}

