/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar.internal;

import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.pulsar.internal.DateTimeUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.IncompatibleSchemaException;
import org.apache.flink.streaming.connectors.pulsar.internal.JSONOptions;
import org.apache.flink.streaming.connectors.pulsar.internal.JacksonRecordParser;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.streaming.connectors.pulsar.internal.SchemaTranslator;
import org.apache.flink.streaming.connectors.pulsar.internal.SimpleSchemaTranslator;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.FieldsDataType;
import org.apache.flink.table.types.KeyValueDataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.util.ExceptionUtils;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.shade.com.fasterxml.jackson.core.JsonFactory;
import org.apache.pulsar.shade.com.fasterxml.jackson.core.JsonParser;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableSet;
import org.apache.pulsar.shade.org.apache.avro.Conversions;
import org.apache.pulsar.shade.org.apache.avro.LogicalTypes;
import org.apache.pulsar.shade.org.apache.avro.Schema;
import org.apache.pulsar.shade.org.apache.avro.SchemaBuilder;
import org.apache.pulsar.shade.org.apache.avro.generic.GenericData;
import org.apache.pulsar.shade.org.apache.avro.generic.GenericFixed;
import org.apache.pulsar.shade.org.apache.avro.generic.GenericRecord;
import org.apache.pulsar.shade.org.apache.avro.util.Utf8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarDeserializer
implements PulsarDeserializationSchema<Row> {
    private static final Logger log = LoggerFactory.getLogger(PulsarDeserializer.class);
    private final Function<Message<?>, Row> converter;
    private final DataType rootDataType;
    private final FieldsDataType fieldsDataType;
    private final SchemaTranslator schemaTranslator;
    private final NewDecimalConversion decimalConversions = new NewDecimalConversion();

    public PulsarDeserializer(SchemaInfo schemaInfo, JSONOptions parsedOptions, boolean useExtendField) {
        try {
            this.schemaTranslator = new SimpleSchemaTranslator(useExtendField);
            this.fieldsDataType = this.schemaTranslator.pulsarSchemaToFieldsDataType(schemaInfo);
            this.rootDataType = this.schemaTranslator.schemaInfo2SqlType(schemaInfo);
            switch (schemaInfo.getType()) {
                case AVRO: {
                    FieldsDataType st = (FieldsDataType)this.rootDataType;
                    int fieldsNum = useExtendField ? st.getChildren().size() + PulsarOptions.META_FIELD_NAMES.size() : st.getChildren().size();
                    Schema avroSchema = new Schema.Parser().parse(new String(schemaInfo.getSchema(), StandardCharsets.UTF_8));
                    BinFunction<RowUpdater, GenericRecord> writer = this.getRecordWriter(avroSchema, st, new ArrayList<String>());
                    this.converter = msg -> {
                        RowUpdater fieldUpdater = new RowUpdater();
                        Row resultRow = new Row(fieldsNum);
                        fieldUpdater.setRow(resultRow);
                        Object value = msg.getValue();
                        writer.apply(fieldUpdater, ((GenericAvroRecord)value).getAvroRecord());
                        if (useExtendField) {
                            this.writeMetadataFields((Message<?>)msg, resultRow);
                        }
                        return resultRow;
                    };
                    break;
                }
                case JSON: {
                    FieldsDataType fdt = (FieldsDataType)this.rootDataType;
                    BiFunction<JsonFactory, String, JsonParser> createParser = (jsonFactory, s) -> {
                        try {
                            return jsonFactory.createParser(s);
                        }
                        catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    };
                    JacksonRecordParser rawParser = new JacksonRecordParser(this.rootDataType, parsedOptions);
                    JacksonRecordParser.FailureSafeRecordParser parser = new JacksonRecordParser.FailureSafeRecordParser((s, row) -> rawParser.parse((String)s, createParser, (Row)row), parsedOptions.getParseMode(), fdt);
                    this.converter = msg -> {
                        int rowSize = useExtendField ? fdt.getChildren().size() + PulsarOptions.META_FIELD_NAMES.size() : fdt.getChildren().size();
                        Row resultRow = new Row(rowSize);
                        byte[] value = msg.getData();
                        parser.parse(new String(value, StandardCharsets.UTF_8), resultRow);
                        if (useExtendField) {
                            this.writeMetadataFields((Message<?>)msg, resultRow);
                        }
                        return resultRow;
                    };
                    break;
                }
                default: {
                    TriFunction<RowUpdater, Integer, Object> writer2 = this.newAtomicWriter(this.rootDataType);
                    this.converter = msg -> {
                        RowUpdater fUpdater = new RowUpdater();
                        int rowSize = useExtendField ? 1 + PulsarOptions.META_FIELD_NAMES.size() : 1;
                        Row tmpRow = new Row(rowSize);
                        fUpdater.setRow(tmpRow);
                        Object value = msg.getValue();
                        writer2.apply(fUpdater, 0, value);
                        if (useExtendField) {
                            this.writeMetadataFields((Message<?>)msg, tmpRow);
                        }
                        return tmpRow;
                    };
                    break;
                }
            }
        }
        catch (IncompatibleSchemaException e) {
            log.error("Failed to convert pulsar schema to flink data type {}", (Object)ExceptionUtils.stringifyException((Throwable)e));
            throw new RuntimeException(e);
        }
    }

    private void writeMetadataFields(Message<?> message, Row row) {
        int metaStartIdx = row.getArity() - 5;
        if (message.hasKey()) {
            row.setField(metaStartIdx, (Object)message.getKeyBytes());
        } else {
            row.setField(metaStartIdx, null);
        }
        row.setField(metaStartIdx + 1, (Object)message.getTopicName());
        row.setField(metaStartIdx + 2, (Object)message.getMessageId().toByteArray());
        row.setField(metaStartIdx + 3, (Object)LocalDateTime.ofInstant(Instant.ofEpochMilli(message.getPublishTime()), ZoneId.systemDefault()));
        if (message.getEventTime() > 0L) {
            row.setField(metaStartIdx + 4, (Object)LocalDateTime.ofInstant(Instant.ofEpochMilli(message.getEventTime()), ZoneId.systemDefault()));
        } else {
            row.setField(metaStartIdx + 4, null);
        }
    }

    private TriFunction<RowUpdater, Integer, Object> newAtomicWriter(DataType dataType) {
        LogicalTypeRoot tpe = dataType.getLogicalType().getTypeRoot();
        switch (tpe) {
            case DATE: 
            case TIMESTAMP_WITHOUT_TIME_ZONE: {
                return (rowUpdater, ordinal, value) -> rowUpdater.set((int)ordinal, value);
            }
        }
        return (rowUpdater, ordinal, value) -> rowUpdater.set((int)ordinal, value);
    }

    private TriFunction<FlinkDataUpdater, Integer, Object> newWriter(Schema avroType, DataType flinkType, List<String> path) throws IncompatibleSchemaException {
        LogicalTypeRoot tpe = flinkType.getLogicalType().getTypeRoot();
        Schema.Type atpe = avroType.getType();
        if (atpe == Schema.Type.NULL && tpe == LogicalTypeRoot.NULL) {
            return (rowUpdater, ordinal, value) -> rowUpdater.setNullAt((int)ordinal);
        }
        if (atpe == Schema.Type.BOOLEAN && tpe == LogicalTypeRoot.BOOLEAN || atpe == Schema.Type.INT && tpe == LogicalTypeRoot.INTEGER || atpe == Schema.Type.LONG && tpe == LogicalTypeRoot.BIGINT || atpe == Schema.Type.FLOAT && tpe == LogicalTypeRoot.FLOAT || atpe == Schema.Type.DOUBLE && tpe == LogicalTypeRoot.DOUBLE) {
            return (rowUpdater, ordinal, value) -> rowUpdater.set((int)ordinal, value);
        }
        if (atpe == Schema.Type.INT && tpe == LogicalTypeRoot.DATE) {
            return (rowUpdater, ordinal, value) -> rowUpdater.set((int)ordinal, LocalDate.ofEpochDay((Long)value));
        }
        if (atpe == Schema.Type.LONG && tpe == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
            org.apache.pulsar.shade.org.apache.avro.LogicalType altpe = avroType.getLogicalType();
            if (altpe instanceof LogicalTypes.TimestampMillis) {
                return (rowUpdater, ordinal, value) -> rowUpdater.set((int)ordinal, DateTimeUtils.toJavaTimestamp((Long)value * 1000L).toLocalDateTime());
            }
            if (altpe instanceof LogicalTypes.TimestampMicros) {
                return (rowUpdater, ordinal, value) -> rowUpdater.set((int)ordinal, DateTimeUtils.toJavaTimestamp((Long)value).toLocalDateTime());
            }
            throw new IncompatibleSchemaException(String.format("Cannot convert Avro logical type %s to flink timestamp type", altpe.toString()));
        }
        if (atpe == Schema.Type.STRING && tpe == LogicalTypeRoot.VARCHAR) {
            return (rowUpdater, ordinal, value) -> {
                String s = null;
                if (value instanceof String) {
                    s = (String)value;
                } else if (value instanceof Utf8) {
                    Utf8 u8 = (Utf8)value;
                    byte[] bytes = new byte[u8.getByteLength()];
                    System.arraycopy(u8.getBytes(), 0, bytes, 0, u8.getByteLength());
                    s = new String(bytes, StandardCharsets.UTF_8);
                }
                rowUpdater.set((int)ordinal, s);
            };
        }
        if (atpe == Schema.Type.ENUM && tpe == LogicalTypeRoot.VARCHAR) {
            return (rowUpdater, ordinal, value) -> rowUpdater.set((int)ordinal, value.toString());
        }
        if (atpe == Schema.Type.FIXED && tpe == LogicalTypeRoot.BINARY) {
            return (rowUpdater, ordinal, value) -> rowUpdater.set((int)ordinal, ((GenericFixed)value).bytes().clone());
        }
        if (atpe == Schema.Type.BYTES && tpe == LogicalTypeRoot.VARBINARY) {
            return (rowUpdater, ordinal, value) -> {
                byte[] bytes = null;
                if (value instanceof ByteBuffer) {
                    ByteBuffer bb = (ByteBuffer)value;
                    bytes = new byte[bb.remaining()];
                    bb.get(bytes);
                } else if (value instanceof byte[]) {
                    bytes = (byte[])value;
                } else {
                    throw new RuntimeException(value.toString() + " is not a valid avro binary");
                }
                rowUpdater.set((int)ordinal, bytes);
            };
        }
        if (atpe == Schema.Type.FIXED && tpe == LogicalTypeRoot.DECIMAL) {
            DecimalType d = (DecimalType)flinkType.getLogicalType();
            return (rowUpdater, ordinal, value) -> {
                BigDecimal bigDecimal = this.decimalConversions.fromFixed((GenericFixed)value, avroType, (org.apache.pulsar.shade.org.apache.avro.LogicalType)LogicalTypes.decimal((int)d.getPrecision(), (int)d.getScale()));
                rowUpdater.set((int)ordinal, bigDecimal);
            };
        }
        if (atpe == Schema.Type.BYTES && tpe == LogicalTypeRoot.DECIMAL) {
            DecimalType d = (DecimalType)flinkType.getLogicalType();
            return (rowUpdater, ordinal, value) -> {
                BigDecimal bigDecimal = this.decimalConversions.fromBytes((ByteBuffer)value, avroType, (org.apache.pulsar.shade.org.apache.avro.LogicalType)LogicalTypes.decimal((int)d.getPrecision(), (int)d.getScale()));
                rowUpdater.set((int)ordinal, bigDecimal);
            };
        }
        if (atpe == Schema.Type.RECORD && tpe == LogicalTypeRoot.ROW) {
            FieldsDataType fieldsDataType = (FieldsDataType)flinkType;
            BinFunction<RowUpdater, GenericRecord> writeRecord = this.getRecordWriter(avroType, fieldsDataType, path);
            return (rowUpdater, ordinal, value) -> {
                Row row = new Row(fieldsDataType.getChildren().size());
                RowUpdater ru = new RowUpdater();
                ru.setRow(row);
                writeRecord.apply(ru, (GenericRecord)value);
                rowUpdater.set((int)ordinal, row);
            };
        }
        if (tpe == LogicalTypeRoot.ARRAY && atpe == Schema.Type.ARRAY && flinkType instanceof CollectionDataType) {
            DataType et = ((CollectionDataType)flinkType).getElementDataType();
            boolean containsNull = et.getLogicalType().isNullable();
            TriFunction<FlinkDataUpdater, Integer, Object> elementWriter = this.newWriter(avroType.getElementType(), et, path);
            return (rowUpdater, ordinal, value) -> {
                List array = (List)value;
                int len = array.size();
                Object[] result = new Object[len];
                ArrayDataUpdater elementUpdater = new ArrayDataUpdater(result);
                for (int i = 0; i < len; ++i) {
                    Object element = array.get(i);
                    if (element == null) {
                        if (!containsNull) {
                            throw new RuntimeException(String.format("Array value at path %s is not allowed to be null", path.toString()));
                        }
                        elementUpdater.setNullAt(i);
                        continue;
                    }
                    elementWriter.apply(elementUpdater, i, element);
                }
                rowUpdater.set((int)ordinal, result);
            };
        }
        if (tpe == LogicalTypeRoot.MAP && atpe == Schema.Type.MAP && ((KeyValueDataType)flinkType).getKeyDataType().getLogicalType().getTypeRoot() == LogicalTypeRoot.VARCHAR) {
            KeyValueDataType kvt = (KeyValueDataType)flinkType;
            DataType kt = kvt.getKeyDataType();
            TriFunction<FlinkDataUpdater, Integer, Object> keyWriter = this.newWriter((Schema)SchemaBuilder.builder().stringType(), kt, path);
            DataType vt = kvt.getValueDataType();
            TriFunction<FlinkDataUpdater, Integer, Object> valueWriter = this.newWriter(avroType.getValueType(), vt, path);
            boolean valueContainsNull = vt.getLogicalType().isNullable();
            return (rowUpdater, ordinal, value) -> {
                Map map = (Map)value;
                Object[] keys = new String[map.size()];
                Object[] values = new Object[map.size()];
                ArrayDataUpdater keyUpdater = new ArrayDataUpdater(keys);
                ArrayDataUpdater valueUpdater = new ArrayDataUpdater(values);
                Iterator iterator = map.entrySet().iterator();
                int i = 0;
                while (iterator.hasNext()) {
                    Map.Entry entry = iterator.next();
                    assert (entry.getKey() != null);
                    keyWriter.apply(keyUpdater, i, entry.getKey());
                    if (entry.getValue() == null) {
                        if (!valueContainsNull) {
                            throw new RuntimeException(String.format("Map value at path %s is not allowed to be null", path.toString()));
                        }
                        valueUpdater.setNullAt(i);
                    } else {
                        valueWriter.apply(valueUpdater, i, entry.getValue());
                    }
                    ++i;
                }
                HashMap<Object, Object> result = new HashMap<Object, Object>(map.size());
                for (int j = 0; j < map.size(); ++j) {
                    result.put(keys[j], values[j]);
                }
                rowUpdater.set((int)ordinal, result);
            };
        }
        if (atpe == Schema.Type.UNION) {
            List allTypes = avroType.getTypes();
            List nonNullTypes = allTypes.stream().filter(t -> t.getType() != Schema.Type.NULL).collect(Collectors.toList());
            if (!nonNullTypes.isEmpty()) {
                if (nonNullTypes.size() == 1) {
                    return this.newWriter((Schema)nonNullTypes.get(0), flinkType, path);
                }
                if (nonNullTypes.size() == 2) {
                    Schema.Type tp2;
                    Schema.Type tp1 = ((Schema)nonNullTypes.get(0)).getType();
                    if (ImmutableSet.of((Object)tp1, (Object)(tp2 = ((Schema)nonNullTypes.get(1)).getType())).equals((Object)ImmutableSet.of((Object)Schema.Type.INT, (Object)Schema.Type.LONG)) && flinkType == DataTypes.BIGINT()) {
                        return (updater, ordinal, value) -> {
                            if (value == null) {
                                updater.setNullAt((int)ordinal);
                            } else if (value instanceof Long) {
                                updater.set((int)ordinal, value);
                            } else if (value instanceof Integer) {
                                updater.set((int)ordinal, ((Integer)value).longValue());
                            }
                        };
                    }
                    if (ImmutableSet.of((Object)tp1, (Object)tp2).equals((Object)ImmutableSet.of((Object)Schema.Type.FLOAT, (Object)Schema.Type.DOUBLE)) && flinkType == DataTypes.DOUBLE()) {
                        return (updater, ordinal, value) -> {
                            if (value == null) {
                                updater.setNullAt((int)ordinal);
                            } else if (value instanceof Double) {
                                updater.set((int)ordinal, value);
                            } else if (value instanceof Float) {
                                updater.set((int)ordinal, ((Float)value).doubleValue());
                            }
                        };
                    }
                    throw new IncompatibleSchemaException(String.format("Cannot convert %s %s together to %s", tp1.toString(), tp2.toString(), flinkType.toString()));
                }
                if (tpe == LogicalTypeRoot.ROW && ((RowType)flinkType.getLogicalType()).getFieldCount() == nonNullTypes.size()) {
                    RowType rt = (RowType)flinkType.getLogicalType();
                    ArrayList<TriFunction<FlinkDataUpdater, Integer, Object>> fieldWriters = new ArrayList<TriFunction<FlinkDataUpdater, Integer, Object>>();
                    for (int i = 0; i < nonNullTypes.size(); ++i) {
                        Schema schema = (Schema)nonNullTypes.get(i);
                        String field = (String)rt.getFieldNames().get(i);
                        LogicalType logicalType = rt.getTypeAt(i);
                        fieldWriters.add(this.newWriter(schema, TypeConversions.fromLogicalToDataType((LogicalType)logicalType), Stream.concat(path.stream(), Stream.of(field)).collect(Collectors.toList())));
                    }
                    return (updater, ordinal, value) -> {
                        Row row = new Row(rt.getFieldCount());
                        RowUpdater fieldUpdater = new RowUpdater();
                        fieldUpdater.setRow(row);
                        int i = GenericData.get().resolveUnion(avroType, value);
                        ((TriFunction)fieldWriters.get(i)).apply(fieldUpdater, i, value);
                        updater.set((int)ordinal, row);
                    };
                }
                throw new IncompatibleSchemaException(String.format("Cannot convert avro to flink because schema at %s is not compatible (avroType = %s, sqlType = %s)", path.toString(), avroType.toString(), flinkType.toString()));
            }
            return (updater, ordinal, value) -> updater.setNullAt((int)ordinal);
        }
        throw new IncompatibleSchemaException(String.format("Cannot convert avro to flink because schema at path %s is not compatible (avroType = %s, sqlType = %s)", path.toString(), avroType.toString(), flinkType.toString()));
    }

    private BinFunction<RowUpdater, GenericRecord> getRecordWriter(Schema avroType, FieldsDataType sqlType, List<String> path) throws IncompatibleSchemaException {
        ArrayList<Integer> validFieldIndexes = new ArrayList<Integer>();
        ArrayList<BinFunction<RowUpdater, Object>> fieldWriters = new ArrayList<BinFunction<RowUpdater, Object>>();
        int length = sqlType.getChildren().size();
        RowType rowType = (RowType)sqlType.getLogicalType();
        List fields = rowType.getFields();
        for (int i = 0; i < length; ++i) {
            RowType.RowField sqlField = (RowType.RowField)fields.get(i);
            LogicalType logicalType = rowType.getTypeAt(i);
            Schema.Field avroField = avroType.getField(sqlField.getName());
            if (avroField != null) {
                validFieldIndexes.add(avroField.pos());
                TriFunction<FlinkDataUpdater, Integer, Object> baseWriter = this.newWriter(avroField.schema(), TypeConversions.fromLogicalToDataType((LogicalType)logicalType), Stream.concat(path.stream(), Stream.of(sqlField.getName())).collect(Collectors.toList()));
                int ordinal = i;
                BinFunction<RowUpdater, Object> fieldWriter = (updater, value) -> {
                    if (value == null) {
                        updater.setNullAt(ordinal);
                    } else {
                        baseWriter.apply((FlinkDataUpdater)updater, ordinal, value);
                    }
                };
                fieldWriters.add(fieldWriter);
                continue;
            }
            if (sqlField.getType().isNullable()) continue;
            throw new IncompatibleSchemaException(String.format("Cannot find non-nullable field in avro schema %s", avroType));
        }
        return (rowUpdater, record) -> {
            for (int i = 0; i < validFieldIndexes.size(); ++i) {
                ((BinFunction)fieldWriters.get(i)).apply(rowUpdater, record.get(((Integer)validFieldIndexes.get(i)).intValue()));
            }
        };
    }

    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
    }

    @Override
    public boolean isEndOfStream(Row nextElement) {
        return false;
    }

    @Override
    public Row deserialize(Message message) throws IOException {
        return this.converter.apply(message);
    }

    public TypeInformation<Row> getProducedType() {
        return TypeConversions.fromDataTypeToLegacyInfo((DataType)this.fieldsDataType);
    }

    @Override
    public org.apache.pulsar.client.api.Schema<Row> getSchema() {
        return null;
    }

    public static class NewDecimalConversion
    extends Conversions.DecimalConversion
    implements Serializable {
    }

    public static interface Function<T, R>
    extends Serializable {
        public R apply(T var1);
    }

    public static interface BinFunction<A, B>
    extends Serializable {
        public void apply(A var1, B var2);
    }

    public static interface TriFunction<A, B, C>
    extends Serializable {
        public void apply(A var1, B var2, C var3);
    }

    public static final class ArrayDataUpdater
    implements FlinkDataUpdater {
        private final Object[] array;

        public ArrayDataUpdater(Object[] array) {
            this.array = array;
        }

        @Override
        public void set(int ordinal, Object value) {
            this.array[ordinal] = value;
        }

        @Override
        public void setNullAt(int ordinal) {
            this.array[ordinal] = null;
        }
    }

    public static final class RowUpdater
    implements FlinkDataUpdater {
        private Row row;

        public void setRow(Row currentRow) {
            this.row = currentRow;
        }

        @Override
        public void set(int ordinal, Object value) {
            this.row.setField(ordinal, value);
        }

        @Override
        public void setNullAt(int ordinal) {
            this.row.setField(ordinal, null);
        }
    }

    static interface FlinkDataUpdater
    extends Serializable {
        public void set(int var1, Object var2);

        public void setNullAt(int var1);
    }
}

