package com.ibm.fhir.server.test.kafka;

import com.ibm.fhir.model.resource.Observation;
import com.ibm.fhir.model.resource.Patient;
import com.ibm.fhir.model.test.TestUtil;
import com.ibm.fhir.server.test.FHIRServerTestBase;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.json.JSONObject;
import org.testng.AssertJUnit;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/ibm/fhir/server/test/kafka/KafkaNotificationTest.class */
public class KafkaNotificationTest extends FHIRServerTestBase {
    private Patient savedCreatedPatient;
    private Observation savedCreatedObservation;
    private KafkaConsumer<String, String> consumer = null;
    private Properties connectionProps = null;
    private boolean keepRunning = true;
    private int createCounter = 0;
    private int updateCounter = 0;
    private int expectedCreateCounterValue = 0;
    private int expectedUpdatedCounterValue = 0;
    private static final Logger log = Logger.getLogger(KafkaNotificationTest.class.getName());
    private static boolean ON = false;

    @BeforeClass
    public void setup() throws Exception {
        ON = Boolean.parseBoolean(TestUtil.readTestProperties("test.properties").getProperty("test.kafka.enabled", "false"));
        if (ON) {
            new Thread(new Runnable() { // from class: com.ibm.fhir.server.test.kafka.KafkaNotificationTest.1
                @Override // java.lang.Runnable
                public void run() {
                    KafkaNotificationTest.this.messageConsumer();
                }
            }).start();
        }
    }

    public void messageConsumer() {
        log.entering(getClass().getName(), "messageConsumer");
        try {
            try {
                this.connectionProps = new Properties();
                this.connectionProps.put("bootstrap.servers", getKafkaConnectionInfo());
                this.connectionProps.put("client.id", "fhir-server");
                this.connectionProps.put("group.id", "test-group");
                this.connectionProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                this.connectionProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                this.consumer = new KafkaConsumer<>(this.connectionProps);
                this.consumer.subscribe(Arrays.asList(getKafkaTopicName()));
                System.out.println("Subscribed to topic " + getKafkaTopicName());
                while (this.keepRunning) {
                    ConsumerRecords poll = this.consumer.poll(Duration.of(100L, ChronoUnit.MILLIS));
                    System.out.println("polling msges : " + poll.count());
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        System.out.println("kafka record : " + ((String) consumerRecord.value()));
                        String string = new JSONObject((String) consumerRecord.value()).getString("operationType");
                        if (string.equalsIgnoreCase("create")) {
                            this.createCounter++;
                        }
                        if (string.equalsIgnoreCase("update")) {
                            this.updateCounter++;
                        }
                        System.out.println("createCounter" + this.createCounter);
                    }
                }
                log.exiting(getClass().getName(), "messageConsumer");
            } catch (Throwable th) {
                log.log(Level.SEVERE, "Caught exception while initializing Kafka consumer: ", th);
                log.exiting(getClass().getName(), "messageConsumer");
            }
        } catch (Throwable th2) {
            log.exiting(getClass().getName(), "messageConsumer");
            throw th2;
        }
    }

    @Test(dependsOnMethods = {"testCreateandUpdate"})
    public void verifyResults() {
        if (!ON) {
            System.out.println("Skipping results verification, disabled");
        } else {
            AssertJUnit.assertEquals(this.expectedCreateCounterValue, this.createCounter);
            AssertJUnit.assertEquals(this.expectedUpdatedCounterValue, this.updateCounter);
        }
    }

    @Test(groups = {"kafka-notifications"})
    public void testCreateandUpdate() throws Exception {
        if (!ON) {
            System.out.println("Skipping create and update for kafka, disabled");
            return;
        }
        WebTarget webTarget = getWebTarget();
        Response response = (Response) webTarget.path("Patient").request().post(Entity.entity(TestUtil.readLocalResource("Patient_JohnDoe.json"), "application/fhir+json"), Response.class);
        assertResponse(response, Response.Status.CREATED.getStatusCode());
        this.savedCreatedPatient = (Patient) webTarget.path("Patient/" + getLocationLogicalId(response)).request(new String[]{"application/fhir+json"}).get().readEntity(Patient.class);
        this.expectedCreateCounterValue++;
        Response response2 = (Response) webTarget.path("Observation").request().post(Entity.entity(TestUtil.buildPatientObservation(this.savedCreatedPatient.getId(), "Observation1.json"), "application/fhir+json"), Response.class);
        assertResponse(response2, Response.Status.CREATED.getStatusCode());
        this.savedCreatedObservation = (Observation) webTarget.path("Observation/" + getLocationLogicalId(response2)).request(new String[]{"application/fhir+json"}).get().readEntity(Observation.class);
        this.expectedCreateCounterValue++;
        Observation build = TestUtil.buildPatientObservation(this.savedCreatedPatient.getId(), "Observation2.json").toBuilder().id(this.savedCreatedObservation.getId()).build();
        Response response3 = (Response) webTarget.path("Observation/" + build.getId()).request().put(Entity.entity(build, "application/fhir+json"), Response.class);
        assertResponse(response3, Response.Status.OK.getStatusCode());
        webTarget.path("Observation/" + getLocationLogicalId(response3)).request(new String[]{"application/fhir+json"}).get();
        this.expectedUpdatedCounterValue++;
        System.out.println("expectedUpdatedCounterValue---------" + this.expectedUpdatedCounterValue);
        System.out.println("expectedCreateCounterValue---------" + this.expectedCreateCounterValue);
        this.keepRunning = false;
    }
}
