/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.sink.writer;

import java.io.IOException;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextImpl;
import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer;
import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
import org.apache.flink.connector.pulsar.sink.writer.topic.ProducerRegister;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicRegister;
import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PulsarWriter<IN>
implements TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, PulsarCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class);
    private final PulsarSerializationSchema<IN> serializationSchema;
    private final TopicRegister<IN> topicRegister;
    private final TopicRouter<IN> topicRouter;
    private final MessageDelayer<IN> messageDelayer;
    private final DeliveryGuarantee deliveryGuarantee;
    private final PulsarSinkContext sinkContext;
    private final ProducerRegister producerRegister;
    private final AtomicLong pendingMessages = new AtomicLong(0L);

    public PulsarWriter(SinkConfiguration sinkConfiguration, PulsarSerializationSchema<IN> serializationSchema, TopicRegister<IN> topicRegister, TopicRouter<IN> topicRouter, MessageDelayer<IN> messageDelayer, @Nullable CryptoKeyReader cryptoKeyReader, Sink.InitContext initContext) {
        Preconditions.checkNotNull((Object)((Object)sinkConfiguration));
        this.serializationSchema = (PulsarSerializationSchema)Preconditions.checkNotNull(serializationSchema);
        this.topicRegister = (TopicRegister)Preconditions.checkNotNull(topicRegister);
        this.topicRouter = (TopicRouter)Preconditions.checkNotNull(topicRouter);
        this.messageDelayer = (MessageDelayer)Preconditions.checkNotNull(messageDelayer);
        Preconditions.checkNotNull((Object)initContext);
        this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
        this.sinkContext = new PulsarSinkContextImpl(initContext, sinkConfiguration);
        LOG.debug("Initialize topic metadata after creating Pulsar writer.");
        ProcessingTimeService timeService = initContext.getProcessingTimeService();
        this.topicRegister.open(sinkConfiguration, timeService);
        this.topicRouter.open(sinkConfiguration);
        try {
            SerializationSchema.InitializationContext initializationContext = initContext.asSerializationSchemaInitializationContext();
            this.serializationSchema.open(initializationContext, this.sinkContext, sinkConfiguration);
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Cannot initialize schema.", (Throwable)e);
        }
        this.producerRegister = new ProducerRegister(sinkConfiguration, cryptoKeyReader);
    }

    public void write(IN element, SinkWriter.Context context) throws IOException, InterruptedException {
        PulsarMessage<?> message = this.serializationSchema.serialize(element, this.sinkContext);
        List<String> topics = this.topicRegister.topics(element);
        String keyString = message.getKeyBytes() == null ? message.getKey() : Base64.getEncoder().encodeToString(message.getKeyBytes());
        String topic = this.topicRouter.route(element, keyString, topics, this.sinkContext);
        TypedMessageBuilder<?> builder = this.createMessageBuilder(topic, context, message);
        long deliverAt = this.messageDelayer.deliverAt(element, this.sinkContext);
        if (deliverAt > 0L) {
            builder.deliverAt(deliverAt);
        }
        if (this.deliveryGuarantee == DeliveryGuarantee.NONE) {
            builder.sendAsync();
        } else {
            this.pendingMessages.incrementAndGet();
            builder.sendAsync().whenComplete((id, ex) -> {
                this.pendingMessages.decrementAndGet();
                if (ex != null) {
                    throw new FlinkRuntimeException("Failed to send data to Pulsar " + topic, ex);
                }
                LOG.debug("Sent message to Pulsar {} with message id {}", (Object)topic, id);
            });
        }
    }

    private TypedMessageBuilder<?> createMessageBuilder(String topic, SinkWriter.Context context, PulsarMessage<?> message) {
        List<String> clusters;
        Long sequenceId;
        long eventTime;
        byte[] keyBytes;
        String key;
        Schema<?> schema = message.getSchema();
        TypedMessageBuilder<?> builder = this.producerRegister.createMessageBuilder(topic, schema);
        byte[] orderingKey = message.getOrderingKey();
        if (orderingKey != null && orderingKey.length > 0) {
            builder.orderingKey(orderingKey);
        }
        if (!Strings.isNullOrEmpty((String)(key = message.getKey()))) {
            builder.key(key);
        }
        if ((keyBytes = message.getKeyBytes()) != null) {
            builder.keyBytes(keyBytes);
        }
        if ((eventTime = message.getEventTime()) > 0L) {
            builder.eventTime(eventTime);
        } else {
            Long timestamp = context.timestamp();
            if (timestamp != null && timestamp > 0L) {
                builder.eventTime(timestamp);
            }
        }
        builder.value(message.getValue());
        Map<String, String> properties = message.getProperties();
        if (properties != null && !properties.isEmpty()) {
            builder.properties(properties);
        }
        if ((sequenceId = message.getSequenceId()) != null) {
            builder.sequenceId(sequenceId);
        }
        if ((clusters = message.getReplicationClusters()) != null && !clusters.isEmpty()) {
            builder.replicationClusters(clusters);
        }
        if (message.isDisableReplication()) {
            builder.disableReplication();
        }
        return builder;
    }

    public void flush(boolean endOfInput) throws IOException {
        if (endOfInput || this.deliveryGuarantee != DeliveryGuarantee.NONE) {
            LOG.info("Flush the pending messages to Pulsar.");
            this.producerRegister.flush();
            while (this.pendingMessages.longValue() > 0L) {
                this.producerRegister.flush();
            }
        }
    }

    public Collection<PulsarCommittable> prepareCommit() {
        if (this.deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
            return this.producerRegister.prepareCommit();
        }
        return Collections.emptyList();
    }

    public void close() throws Exception {
        IOUtils.closeAll((AutoCloseable[])new AutoCloseable[]{this.topicRegister, this.producerRegister});
    }
}

