package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.kafka;

import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerializer;

/* loaded from: input_file:BOOT-INF/lib/alpha-spec-saga-akka-0.7.0.jar:org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/kafka/KafkaMessagePublisher.class */
public class KafkaMessagePublisher implements MessagePublisher<BaseEvent> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaMessagePublisher.class);
    private String topic;
    private KafkaTemplate<String, Object> kafkaTemplate;

    public KafkaMessagePublisher(String str, String str2, Map<String, String> map) {
        this.topic = str2;
        HashMap newHashMap = Maps.newHashMap();
        map.forEach((str3, str4) -> {
            newHashMap.put(str3, str4);
        });
        newHashMap.put("bootstrap.servers", str);
        newHashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        newHashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        this.kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory(newHashMap));
    }

    @Override // org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher
    public void publish(BaseEvent baseEvent) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("send [{}] {} {}", baseEvent.getGlobalTxId(), baseEvent.getType(), baseEvent.getLocalTxId());
        }
        try {
            this.kafkaTemplate.send(this.topic, baseEvent.getGlobalTxId(), baseEvent).get();
        } catch (InterruptedException | UnsupportedOperationException | ExecutionException e) {
            if (e.getCause() instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new RuntimeException(e);
        }
    }
}
