package com.ibm.fhir.server.notification.kafka;

import com.ibm.fhir.audit.configuration.type.KafkaType;
import com.ibm.fhir.config.FHIRConfigHelper;
import com.ibm.fhir.config.FHIRConfiguration;
import com.ibm.fhir.server.notification.FHIRNotificationEvent;
import com.ibm.fhir.server.notification.FHIRNotificationException;
import com.ibm.fhir.server.notification.FHIRNotificationService;
import com.ibm.fhir.server.notification.FHIRNotificationSubscriber;
import com.ibm.fhir.server.notification.FHIRNotificationUtil;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/* loaded from: input_file:WEB-INF/lib/fhir-server-4.10.1.jar:com/ibm/fhir/server/notification/kafka/FHIRNotificationKafkaPublisher.class */
public class FHIRNotificationKafkaPublisher implements FHIRNotificationSubscriber {
    private static final Logger LOG = Logger.getLogger(FHIRNotificationKafkaPublisher.class.getName());
    private static FHIRNotificationService service = FHIRNotificationService.getInstance();
    private String topicName = null;
    private Producer<String, String> producer = null;
    private Properties kafkaProps = null;

    /* loaded from: input_file:WEB-INF/lib/fhir-server-4.10.1.jar:com/ibm/fhir/server/notification/kafka/FHIRNotificationKafkaPublisher$KafkaPublisherCallback.class */
    public class KafkaPublisherCallback implements Callback {
        private FHIRNotificationEvent event;
        private String notificationEvent;
        private String topicId;

        public KafkaPublisherCallback(FHIRNotificationEvent fHIRNotificationEvent, String str, String str2) {
            this.event = fHIRNotificationEvent;
            this.notificationEvent = str;
            this.topicId = str2;
        }

        @Override // org.apache.kafka.clients.producer.Callback
        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            FHIRNotificationKafkaPublisher.LOG.entering(getClass().getName(), "onCompletion");
            try {
                if (exc == null) {
                    FHIRNotificationKafkaPublisher.LOG.info("Successfully published kafka notification event for resource: " + this.event.getLocation());
                } else {
                    FHIRNotificationKafkaPublisher.LOG.log(Level.SEVERE, FHIRNotificationKafkaPublisher.this.buildNotificationErrorMessage(this.topicId, this.notificationEvent), (Throwable) exc);
                }
                FHIRNotificationKafkaPublisher.LOG.exiting(getClass().getName(), "onCompletion");
            } catch (Throwable th) {
                FHIRNotificationKafkaPublisher.LOG.exiting(getClass().getName(), "onCompletion");
                throw th;
            }
        }
    }

    protected FHIRNotificationKafkaPublisher() {
    }

    public FHIRNotificationKafkaPublisher(String str, Properties properties) {
        LOG.entering(getClass().getName(), "ctor");
        try {
            init(str, properties);
            LOG.exiting(getClass().getName(), "ctor");
        } catch (Throwable th) {
            LOG.exiting(getClass().getName(), "ctor");
            throw th;
        }
    }

    private void init(String str, Properties properties) {
        LOG.entering(getClass().getName(), "init");
        try {
            try {
                this.topicName = str;
                this.kafkaProps = properties;
                if (LOG.isLoggable(Level.FINER)) {
                    LOG.finer("Kafka publisher is configured with the following properties:\n" + this.kafkaProps.toString());
                    LOG.finer("Topic name: " + this.topicName);
                }
                this.kafkaProps.put("key.serializer", KafkaType.KAFKA_DEFAULT_SERIALIZER);
                this.kafkaProps.put("value.serializer", KafkaType.KAFKA_DEFAULT_SERIALIZER);
                this.kafkaProps.put("client.id", "fhir-server");
                String property = this.kafkaProps.getProperty("bootstrap.servers");
                if (property == null) {
                    throw new IllegalStateException("The bootstrap.servers property was missing from the Kafka connection properties.");
                }
                this.producer = new KafkaProducer(this.kafkaProps);
                service.subscribe(this);
                LOG.info("Initialized Kafka publisher for topic '" + str + "' using bootstrap servers: " + property + ".");
                LOG.exiting(getClass().getName(), "init");
            } catch (Throwable th) {
                LOG.log(Level.SEVERE, "Caught exception while initializing Kafka publisher.", th);
                throw new IllegalStateException("Caught exception while initializing Kafka publisher.", th);
            }
        } catch (Throwable th2) {
            LOG.exiting(getClass().getName(), "init");
            throw th2;
        }
    }

    public void shutdown() {
        LOG.entering(getClass().getName(), "shutdown");
        try {
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("Shutting down Kafka publisher for topic: '" + this.topicName + "'.");
            }
            if (this.producer != null) {
                this.producer.close();
            }
            LOG.exiting(getClass().getName(), "shutdown");
        } catch (Throwable th) {
            LOG.exiting(getClass().getName(), "shutdown");
            throw th;
        }
    }

    @Override // com.ibm.fhir.server.notification.FHIRNotificationSubscriber
    public void notify(FHIRNotificationEvent fHIRNotificationEvent) throws FHIRNotificationException {
        FHIRNotificationException fHIRNotificationException;
        LOG.entering(getClass().getName(), "notify");
        String str = "[" + this.kafkaProps.getProperty("bootstrap.servers") + "]/" + this.topicName;
        String str2 = null;
        try {
            try {
                str2 = FHIRNotificationUtil.toJsonString(fHIRNotificationEvent, true);
                if (LOG.isLoggable(Level.FINE)) {
                    LOG.fine("Publishing kafka notification event to topic '" + str + "',\nmessage: " + str2);
                }
                if (FHIRConfigHelper.getBooleanProperty(FHIRConfiguration.PROPERTY_KAFKA_SYNC, false).booleanValue()) {
                    RecordMetadata recordMetadata = this.producer.send(new ProducerRecord<>(this.topicName, str2), new KafkaPublisherCallback(fHIRNotificationEvent, str2, str)).get();
                    if (LOG.isLoggable(Level.FINE)) {
                        LOG.fine("Record Produced to Topic '" + recordMetadata.topic() + "' at time " + recordMetadata.timestamp());
                    }
                } else {
                    this.producer.send(new ProducerRecord<>(this.topicName, str2), new KafkaPublisherCallback(fHIRNotificationEvent, str2, str));
                    if (LOG.isLoggable(Level.FINE)) {
                        LOG.fine("Returned from async kafka send...");
                    }
                }
                LOG.exiting(getClass().getName(), "notify");
            } finally {
            }
        } catch (Throwable th) {
            LOG.exiting(getClass().getName(), "notify");
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String buildNotificationErrorMessage(String str, String str2) {
        return String.format("Kafka publication failure; topic '%s'\nNotification event: %s\n.", str, str2);
    }
}
