package org.apache.beam.sdk.io.solace.broker;

import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
import java.util.Queue;
import org.apache.beam.sdk.io.solace.data.Solace;
import org.apache.beam.sdk.io.solace.write.UnboundedSolaceWriter;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/solace/broker/PublishResultHandler.class */
public final class PublishResultHandler implements JCSMPStreamingPublishCorrelatingEventHandler {
    private static final Logger LOG = LoggerFactory.getLogger(PublishResultHandler.class);
    private final Queue<Solace.PublishResult> publishResultsQueue;
    private final Counter batchesRejectedByBroker = Metrics.counter(UnboundedSolaceWriter.class, "batches_rejected");

    public PublishResultHandler(Queue<Solace.PublishResult> queue) {
        this.publishResultsQueue = queue;
    }

    public void handleErrorEx(Object obj, JCSMPException jCSMPException, long j) {
        processKey(obj, false, jCSMPException);
    }

    public void responseReceivedEx(Object obj) {
        processKey(obj, true, null);
    }

    private void processKey(Object obj, boolean z, JCSMPException jCSMPException) {
        String obj2;
        Solace.PublishResult.Builder builder = Solace.PublishResult.builder();
        if (obj == null) {
            obj2 = "";
        } else if (obj instanceof Solace.CorrelationKey) {
            obj2 = ((Solace.CorrelationKey) obj).getMessageId();
            builder = builder.setLatencyNanos(Long.valueOf(calculateLatency((Solace.CorrelationKey) obj)));
        } else {
            obj2 = obj.toString();
        }
        Solace.PublishResult.Builder published = builder.setMessageId(obj2).setPublished(Boolean.valueOf(z));
        if (!z) {
            this.batchesRejectedByBroker.inc();
            published = jCSMPException != null ? published.setError(jCSMPException.getMessage()) : published.setError("NULL - Not set by Solace");
        } else if (jCSMPException != null) {
            LOG.warn("Message with id {} is published but exception is populated. Ignoring exception", obj2);
        }
        this.publishResultsQueue.add(published.build());
    }

    private static long calculateLatency(Solace.CorrelationKey correlationKey) {
        return System.nanoTime() - correlationKey.getPublishMonotonicNanos();
    }
}
