/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarSerializationSchema;
import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializableObject;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class FlinkPulsarSinkBase<T>
extends RichSinkFunction<T>
implements CheckpointedFunction {
    private static final Logger log = LoggerFactory.getLogger(FlinkPulsarSinkBase.class);
    protected String adminUrl;
    protected ClientConfigurationData clientConfigurationData;
    protected final Map<String, String> caseInsensitiveParams;
    protected final Map<String, Object> producerConf;
    protected final Properties properties;
    protected boolean flushOnCheckpoint;
    protected boolean failOnWrite;
    protected final SerializableObject pendingRecordsLock = new SerializableObject();
    protected long pendingRecords = 0L;
    protected final boolean forcedTopic;
    protected final String defaultTopic;
    protected final PulsarSerializationSchema<T> serializationSchema;
    protected volatile transient Throwable failedWrite;
    protected transient PulsarAdmin admin;
    protected transient BiConsumer<MessageId, Throwable> sendCallback;
    protected transient Producer<byte[]> singleProducer;
    protected transient Map<String, Producer<byte[]>> topic2Producer;

    public FlinkPulsarSinkBase(String adminUrl, Optional<String> defaultTopicName, ClientConfigurationData clientConf, Properties properties, PulsarSerializationSchema<T> serializationSchema) {
        this.adminUrl = (String)Preconditions.checkNotNull((Object)adminUrl);
        if (defaultTopicName.isPresent()) {
            this.forcedTopic = true;
            this.defaultTopic = defaultTopicName.get();
        } else {
            this.forcedTopic = false;
            this.defaultTopic = null;
        }
        this.serializationSchema = serializationSchema;
        this.clientConfigurationData = clientConf;
        this.properties = (Properties)Preconditions.checkNotNull((Object)properties);
        this.caseInsensitiveParams = SourceSinkUtils.toCaceInsensitiveParams((Map<String, String>)Maps.fromProperties((Properties)properties));
        this.producerConf = SourceSinkUtils.getProducerParams((Map<String, String>)Maps.fromProperties((Properties)properties));
        this.flushOnCheckpoint = SourceSinkUtils.flushOnCheckpoint(this.caseInsensitiveParams);
        this.failOnWrite = SourceSinkUtils.failOnWrite(this.caseInsensitiveParams);
        CachedPulsarClient.setCacheSize(SourceSinkUtils.getClientCacheSize(this.caseInsensitiveParams));
        if (this.clientConfigurationData.getServiceUrl() == null) {
            throw new IllegalArgumentException("ServiceUrl must be supplied in the client configuration");
        }
    }

    public FlinkPulsarSinkBase(String serviceUrl, String adminUrl, Optional<String> defaultTopicName, Properties properties, PulsarSerializationSchema serializationSchema) {
        this(adminUrl, defaultTopicName, PulsarClientUtils.newClientConf((String)Preconditions.checkNotNull((Object)serviceUrl), properties), properties, serializationSchema);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.checkErroneous();
        if (this.flushOnCheckpoint) {
            this.producerFlush();
            SerializableObject serializableObject = this.pendingRecordsLock;
            synchronized (serializableObject) {
                if (this.pendingRecords != 0L) {
                    throw new IllegalStateException("Pending record count must be zero at this point " + this.pendingRecords);
                }
                this.checkErroneous();
            }
        }
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
    }

    public void open(Configuration parameters) throws Exception {
        if (this.flushOnCheckpoint && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) {
            log.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
            this.flushOnCheckpoint = false;
        }
        this.admin = PulsarClientUtils.newAdminFromConf(this.adminUrl, this.clientConfigurationData);
        this.serializationSchema.open(() -> this.getRuntimeContext().getMetricGroup().addGroup("user"));
        if (this.forcedTopic) {
            this.uploadSchema(this.defaultTopic);
            this.singleProducer = this.createProducer(this.clientConfigurationData, this.producerConf, this.defaultTopic);
        } else {
            this.topic2Producer = new HashMap<String, Producer<byte[]>>();
        }
    }

    protected void initializeSendCallback() {
        if (this.sendCallback != null) {
            return;
        }
        this.sendCallback = this.failOnWrite ? (t, u) -> {
            if (this.failedWrite == null && u == null) {
                this.acknowledgeMessage();
            } else if (this.failedWrite == null && u != null) {
                this.failedWrite = u;
            }
        } : (t, u) -> {
            if (this.failedWrite == null && u != null) {
                log.error("Error while sending message to Pulsar: {}", (Object)ExceptionUtils.stringifyException((Throwable)u));
            }
            this.acknowledgeMessage();
        };
    }

    private void uploadSchema(String topic) {
        SchemaUtils.uploadPulsarSchema(this.admin, topic, this.serializationSchema.getPulsarSchema().getSchemaInfo());
    }

    public void close() throws Exception {
        this.checkErroneous();
        this.producerClose();
        this.checkErroneous();
    }

    protected Producer<byte[]> getProducer(String topic) {
        if (this.forcedTopic) {
            return this.singleProducer;
        }
        if (this.topic2Producer.containsKey(topic)) {
            return this.topic2Producer.get(topic);
        }
        this.uploadSchema(topic);
        Producer<byte[]> p = this.createProducer(this.clientConfigurationData, this.producerConf, topic);
        this.topic2Producer.put(topic, p);
        return p;
    }

    protected Producer<byte[]> createProducer(ClientConfigurationData clientConf, Map<String, Object> producerConf, String topic) {
        try {
            return CachedPulsarClient.getOrCreate(clientConf).newProducer(Schema.AUTO_PRODUCE_BYTES(this.serializationSchema.getPulsarSchema())).topic(topic).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).batchingMaxMessages(0x500000).loadConf(producerConf).create();
        }
        catch (PulsarClientException e) {
            log.error("Failed to create producer for topic {}", (Object)topic);
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            log.error("Failed to getOrCreate a PulsarClient");
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void producerFlush() throws Exception {
        if (this.singleProducer != null) {
            this.singleProducer.flush();
        } else if (this.topic2Producer != null) {
            for (Producer producer : this.topic2Producer.values()) {
                producer.flush();
            }
        }
        SerializableObject serializableObject = this.pendingRecordsLock;
        synchronized (serializableObject) {
            while (this.pendingRecords > 0L) {
                try {
                    this.pendingRecordsLock.wait();
                }
                catch (InterruptedException interruptedException) {
                    throw new RuntimeException("Flushing got interrupted while checkpointing", interruptedException);
                }
            }
        }
    }

    protected void producerClose() throws Exception {
        this.producerFlush();
        if (this.admin != null) {
            this.admin.close();
        }
        if (this.singleProducer != null) {
            this.singleProducer.close();
        } else if (this.topic2Producer != null) {
            for (Producer<byte[]> p : this.topic2Producer.values()) {
                p.close();
            }
            this.topic2Producer.clear();
        }
    }

    protected void checkErroneous() throws Exception {
        Throwable e = this.failedWrite;
        if (e != null) {
            this.failedWrite = null;
            throw new Exception("Failed to send data to Pulsar: " + e.getMessage(), e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void acknowledgeMessage() {
        if (this.flushOnCheckpoint) {
            SerializableObject serializableObject = this.pendingRecordsLock;
            synchronized (serializableObject) {
                --this.pendingRecords;
                if (this.pendingRecords == 0L) {
                    this.pendingRecordsLock.notifyAll();
                }
            }
        }
    }
}

