package org.apache.flink.streaming.connectors.pulsar;

import java.util.Arrays;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.ProducerConfiguration;

/* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.class */
public abstract class PulsarTableSink implements AppendStreamTableSink<Row> {
    protected final String serviceUrl;
    protected final String topic;
    protected final ProducerConfiguration producerConf;
    protected SerializationSchema<Row> serializationSchema;
    protected PulsarKeyExtractor<Row> keyExtractor;
    protected String[] fieldNames;
    protected TypeInformation[] fieldTypes;
    protected final String routingKeyFieldName;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/PulsarTableSink$RowKeyExtractor.class */
    private static class RowKeyExtractor implements PulsarKeyExtractor<Row> {
        private final int keyIndex;

        public RowKeyExtractor(String str, String[] strArr, TypeInformation<?>[] typeInformationArr) {
            Preconditions.checkArgument(strArr.length == typeInformationArr.length, "Number of provided field names and types does not match.");
            int indexOf = Arrays.asList(strArr).indexOf(str);
            Preconditions.checkArgument(indexOf >= 0, "Key field '" + str + "' not found");
            Preconditions.checkArgument(Types.STRING.equals(typeInformationArr[indexOf]), "Key field must be of type 'STRING'");
            this.keyIndex = indexOf;
        }

        @Override // org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor
        public String getKey(Row row) {
            return (String) row.getField(this.keyIndex);
        }
    }

    public PulsarTableSink(String str, String str2, ProducerConfiguration producerConfiguration, String str3) {
        this.serviceUrl = (String) Preconditions.checkNotNull(str, "Service url not set");
        this.topic = (String) Preconditions.checkNotNull(str2, "Topic is null");
        this.producerConf = (ProducerConfiguration) Preconditions.checkNotNull(producerConfiguration, "Producer configuration not set");
        this.routingKeyFieldName = str3;
    }

    protected abstract SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowTypeInfo);

    protected abstract PulsarTableSink createSink();

    protected FlinkPulsarProducer<Row> createFlinkPulsarProducer() {
        return new FlinkPulsarProducer<>(this.serviceUrl, this.topic, this.serializationSchema, this.producerConf, this.keyExtractor);
    }

    public void emitDataStream(DataStream<Row> dataStream) {
        Preconditions.checkState(this.fieldNames != null, "Table sink is not configured");
        Preconditions.checkState(this.fieldTypes != null, "Table sink is not configured");
        Preconditions.checkState(this.serializationSchema != null, "Table sink is not configured");
        Preconditions.checkState(this.keyExtractor != null, "Table sink is not configured");
        dataStream.addSink(createFlinkPulsarProducer());
    }

    public TypeInformation<Row> getOutputType() {
        return new RowTypeInfo(this.fieldTypes, this.fieldNames);
    }

    public String[] getFieldNames() {
        return this.fieldNames;
    }

    public TypeInformation<?>[] getFieldTypes() {
        return this.fieldTypes;
    }

    public TableSink<Row> configure(String[] strArr, TypeInformation<?>[] typeInformationArr) {
        PulsarTableSink createSink = createSink();
        createSink.fieldNames = (String[]) Preconditions.checkNotNull(strArr, "Field names are null");
        createSink.fieldTypes = (TypeInformation[]) Preconditions.checkNotNull(typeInformationArr, "Field types are null");
        Preconditions.checkArgument(strArr.length == typeInformationArr.length, "Number of provided field names and types do not match");
        createSink.serializationSchema = createSerializationSchema(new RowTypeInfo(typeInformationArr, strArr));
        createSink.keyExtractor = new RowKeyExtractor(this.routingKeyFieldName, strArr, typeInformationArr);
        return createSink;
    }
}
