package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.kafka;

import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.AbstractActorEventChannel;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.properties.SpecSagaAkkaProperties;
import org.eclipse.persistence.exceptions.JAXBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/alpha-spec-saga-akka-0.7.0.jar:org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/kafka/KafkaActorEventChannel.class */
public class KafkaActorEventChannel extends AbstractActorEventChannel {
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final KafkaMessagePublisher kafkaMessagePublisher;

    public KafkaActorEventChannel(SpecSagaAkkaProperties specSagaAkkaProperties, MetricsService metricsService) {
        super(metricsService);
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", specSagaAkkaProperties.getChannel().getKafka().getBootstrapServers());
        hashMap.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Integer.valueOf(JAXBException.NO_OBJECT_FACTORY_OR_JAXB_INDEX_IN_PATH));
        AdminClient create = KafkaAdminClient.create((Map<String, Object>) hashMap);
        Throwable th = null;
        try {
            try {
                create.createTopics(Collections.singleton(new NewTopic(specSagaAkkaProperties.getChannel().getKafka().getTopic(), specSagaAkkaProperties.getChannel().getKafka().getNumPartitions(), specSagaAkkaProperties.getChannel().getKafka().getReplicationFactor()))).values().get(specSagaAkkaProperties.getChannel().getKafka().getTopic()).get();
            } catch (InterruptedException | ExecutionException e) {
                if (e.getCause() instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                if (!(e.getCause() instanceof TopicExistsException)) {
                    throw new RuntimeException(e.getMessage(), e);
                }
            }
            this.kafkaMessagePublisher = new KafkaMessagePublisher(specSagaAkkaProperties.getChannel().getKafka().getBootstrapServers(), specSagaAkkaProperties.getChannel().getKafka().getTopic(), specSagaAkkaProperties.getChannel().getKafka().getProducer());
            LOG.info("Kafka Channel Init");
        } finally {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    create.close();
                }
            }
        }
    }

    @Override // org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.AbstractActorEventChannel
    public void sendTo(BaseEvent baseEvent) {
        this.kafkaMessagePublisher.publish(baseEvent);
    }
}
