/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar;

import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase;
import org.apache.flink.streaming.connectors.pulsar.PulsarRowSerializationSchema;
import org.apache.flink.streaming.connectors.pulsar.SerializableFunction;
import org.apache.flink.streaming.connectors.pulsar.config.RecordSchemaType;
import org.apache.flink.streaming.connectors.pulsar.internal.DateTimeUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.FieldsDataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializableObject;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
public class FlinkPulsarRowSink
extends FlinkPulsarSinkBase<Row> {
    private static final Logger log = LoggerFactory.getLogger(FlinkPulsarRowSink.class);
    protected final DataType dataType;
    private DataType valueType;
    private SerializableFunction<Row, Row> valueProjection;
    private SerializableFunction<Row, Row> metaProjection;

    public FlinkPulsarRowSink(String adminUrl, Optional<String> defaultTopicName, ClientConfigurationData clientConf, Properties properties, SerializationSchema serializationSchema, DataType dataType) {
        super(adminUrl, defaultTopicName, clientConf, properties, new RowSinkSerializationSchema(defaultTopicName.get(), (SerializationSchema<Row>)serializationSchema, RecordSchemaType.AVRO, dataType));
        this.dataType = dataType;
        this.createProjection();
    }

    public FlinkPulsarRowSink(String serviceUrl, String adminUrl, Optional<String> defaultTopicName, Properties properties, SerializationSchema<Row> serializationSchema, DataType dataType) {
        this(adminUrl, defaultTopicName, PulsarClientUtils.newClientConf((String)Preconditions.checkNotNull((Object)serviceUrl), properties), properties, serializationSchema, dataType);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void createProjection() {
        int[] metas = new int[3];
        FieldsDataType fdt = (FieldsDataType)this.dataType;
        RowType rowType = (RowType)fdt.getLogicalType();
        List rowFields = ((RowType)fdt.getLogicalType()).getFields();
        HashMap<String, Tuple2> name2Type = new HashMap<String, Tuple2>();
        for (int i = 0; i < rowFields.size(); ++i) {
            RowType.RowField rf = (RowType.RowField)rowFields.get(i);
            name2Type.put(rf.getName(), new Tuple2((Object)rf.getType().getTypeRoot(), (Object)i));
        }
        if (name2Type.containsKey("__topic")) {
            Tuple2 value = (Tuple2)name2Type.get("__topic");
            if (value.f0 != LogicalTypeRoot.VARCHAR) throw new IllegalStateException(String.format("attribute unsupported type %s, %s must be a string", ((LogicalTypeRoot)value.f0).toString(), "__topic"));
            metas[0] = (Integer)value.f1;
        } else {
            if (!this.forcedTopic) {
                throw new IllegalStateException(String.format("topic option required when no %s attribute is present.", "__topic"));
            }
            metas[0] = -1;
        }
        if (name2Type.containsKey("__key")) {
            Tuple2 value = (Tuple2)name2Type.get("__key");
            if (value.f0 != LogicalTypeRoot.VARBINARY) throw new IllegalStateException(String.format("%s attribute unsupported type %s", "__key", ((LogicalTypeRoot)value.f0).toString()));
            metas[1] = (Integer)value.f1;
        } else {
            metas[1] = -1;
        }
        if (name2Type.containsKey("__eventTime")) {
            Tuple2 value = (Tuple2)name2Type.get("__eventTime");
            if (value.f0 != LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) throw new IllegalStateException(String.format("%s attribute unsupported type %s", "__eventTime", ((LogicalTypeRoot)value.f0).toString()));
            metas[2] = (Integer)value.f1;
        } else {
            metas[2] = -1;
        }
        List nonInternalFields = rowFields.stream().filter(f -> !PulsarOptions.META_FIELD_NAMES.contains(f.getName())).collect(Collectors.toList());
        List<DataTypes.Field> fields = nonInternalFields.stream().map(f -> {
            String fieldName = f.getName();
            int fieldIndex = rowType.getFieldIndex(fieldName);
            LogicalType logicalType = rowType.getTypeAt(fieldIndex);
            return DataTypes.FIELD((String)fieldName, (DataType)TypeConversions.fromLogicalToDataType((LogicalType)logicalType));
        }).collect(Collectors.toList());
        this.valueType = DataTypes.ROW((DataTypes.Field[])fields.toArray(new DataTypes.Field[0]));
        List values = nonInternalFields.stream().map(f -> (Integer)((Tuple2)name2Type.get((Object)f.getName())).f1).collect(Collectors.toList());
        this.metaProjection = row -> {
            Row result = new Row(3);
            for (int i = 0; i < metas.length; ++i) {
                if (metas[i] == -1) continue;
                result.setField(i, row.getField(metas[i]));
            }
            return result;
        };
        this.valueProjection = row -> {
            Row result = new Row(values.size());
            for (int i = 0; i < values.size(); ++i) {
                result.setField(i, row.getField(((Integer)values.get(i)).intValue()));
            }
            return result;
        };
    }

    protected Schema<?> getPulsarSchema() {
        return this.dataType2PulsarSchema(this.valueType);
    }

    private Schema dataType2PulsarSchema(DataType dataType) {
        org.apache.avro.Schema avroSchema = AvroSchemaConverter.convertToSchema((LogicalType)dataType.getLogicalType());
        byte[] schemaBytes = avroSchema.toString().getBytes(StandardCharsets.UTF_8);
        SchemaInfo si = new SchemaInfo();
        si.setSchema(schemaBytes);
        String formatName = this.properties.getProperty("format.type", "json");
        if (formatName.equals("avro")) {
            si.setName("Avro");
            si.setType(SchemaType.AVRO);
        } else {
            si.setName("Json");
            si.setType(SchemaType.JSON);
        }
        return Schema.generic((SchemaInfo)si);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void invoke(Row value, SinkFunction.Context context) throws Exception {
        long et;
        this.checkErroneous();
        this.initializeSendCallback();
        Row metaRow = (Row)this.metaProjection.apply(value);
        Row valueRow = (Row)this.valueProjection.apply(value);
        byte[] serializeValue = this.serializationSchema.serialize(valueRow);
        String topic = this.forcedTopic ? this.defaultTopic : (String)metaRow.getField(0);
        String key = (String)metaRow.getField(1);
        Timestamp eventTime = (Timestamp)metaRow.getField(2);
        if (topic == null) {
            if (this.failOnWrite) {
                throw new NullPointerException("null topic present in the data");
            }
            return;
        }
        TypedMessageBuilder builder = this.getProducer(topic).newMessage().value((Object)serializeValue);
        if (key != null) {
            builder.keyBytes(key.getBytes());
        }
        if (eventTime != null && (et = DateTimeUtils.fromJavaTimestamp(eventTime)) > 0L) {
            builder.eventTime(et);
        }
        if (this.flushOnCheckpoint) {
            SerializableObject serializableObject = this.pendingRecordsLock;
            synchronized (serializableObject) {
                ++this.pendingRecords;
            }
        }
        builder.sendAsync().whenComplete(this.sendCallback);
    }

    static class RowSinkSerializationSchema
    extends PulsarRowSerializationSchema {
        private SerializableFunction<Row, Row> valueProjection;
        private SerializationSchema<Row> valueSerialization;

        RowSinkSerializationSchema(String topic, SerializationSchema<Row> valueSerialization, RecordSchemaType recordSchemaType, DataType dataType) {
            super(topic, valueSerialization, false, null, null, recordSchemaType, dataType);
            this.valueSerialization = valueSerialization;
        }

        public void setValueProjection(SerializableFunction<Row, Row> valueProjection) {
            this.valueProjection = valueProjection;
        }

        @Override
        public byte[] serialize(Row element) {
            Preconditions.checkNotNull(this.valueProjection, (String)"valueProjection must be not null");
            Row value = (Row)this.valueProjection.apply(element);
            return this.valueSerialization.serialize((Object)value);
        }
    }
}

