package org.apache.beam.sdk.io.solace.write;

import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.DeliveryMode;
import com.solacesystems.jcsmp.Destination;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPSendMultipleEntry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.solace.SolaceIO;
import org.apache.beam.sdk.io.solace.broker.SessionService;
import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
import org.apache.beam.sdk.io.solace.data.Solace;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.class */
public abstract class UnboundedSolaceWriter extends DoFn<KV<Integer, Solace.Record>, Solace.PublishResult> {
    static final int SOLACE_BATCH_LIMIT = 50;
    private final SerializableFunction<Solace.Record, Destination> destinationFn;
    private final SessionServiceFactory sessionServiceFactory;
    private final DeliveryMode deliveryMode;
    private final SolaceIO.SubmissionMode submissionMode;
    private final int producersMapCardinality;
    private final boolean publishLatencyMetrics;
    private final List<Solace.Record> batchToEmit;
    private Instant bundleTimestamp;
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSolaceWriter.class);
    private static final AtomicInteger bundleProducerIndexCounter = new AtomicInteger();
    private final Distribution latencyPublish = Metrics.distribution(SolaceIO.Write.class, "latency_publish_ms");
    private final Distribution latencyErrors = Metrics.distribution(SolaceIO.Write.class, "latency_failed_ms");
    private int currentBundleProducerIndex = 0;
    final UUID writerTransformUuid = UUID.randomUUID();

    /* loaded from: input_file:org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter$BeamContextWrapper.class */
    static class BeamContextWrapper {
        private DoFn<KV<Integer, Solace.Record>, Solace.PublishResult>.WindowedContext windowedContext;
        private DoFn<KV<Integer, Solace.Record>, Solace.PublishResult>.FinishBundleContext finishBundleContext;

        private BeamContextWrapper() {
        }

        public static BeamContextWrapper of(DoFn<KV<Integer, Solace.Record>, Solace.PublishResult>.WindowedContext windowedContext) {
            BeamContextWrapper beamContextWrapper = new BeamContextWrapper();
            beamContextWrapper.windowedContext = windowedContext;
            return beamContextWrapper;
        }

        public static BeamContextWrapper of(DoFn<KV<Integer, Solace.Record>, Solace.PublishResult>.FinishBundleContext finishBundleContext) {
            BeamContextWrapper beamContextWrapper = new BeamContextWrapper();
            beamContextWrapper.finishBundleContext = finishBundleContext;
            return beamContextWrapper;
        }

        public <T> void output(TupleTag<T> tupleTag, T t, Instant instant, BoundedWindow boundedWindow) {
            if (this.windowedContext != null) {
                this.windowedContext.output(tupleTag, t);
                return;
            }
            if (this.finishBundleContext == null) {
                throw new IllegalStateException("SolaceIO.Write.UnboundedSolaceWriter.Context: No context provided");
            }
            if (instant == null) {
                throw new IllegalStateException("SolaceIO.Write.UnboundedSolaceWriter.Context: Timestamp is required for a FinishBundleContext.");
            }
            if (boundedWindow == null) {
                throw new IllegalStateException("SolaceIO.Write.UnboundedSolaceWriter.Context: BoundedWindow is required for a FinishBundleContext.");
            }
            this.finishBundleContext.output(tupleTag, t, instant, boundedWindow);
        }
    }

    public UnboundedSolaceWriter(SerializableFunction<Solace.Record, Destination> serializableFunction, SessionServiceFactory sessionServiceFactory, DeliveryMode deliveryMode, SolaceIO.SubmissionMode submissionMode, int i, boolean z) {
        this.destinationFn = serializableFunction;
        this.sessionServiceFactory = sessionServiceFactory;
        this.sessionServiceFactory.setSubmissionMode(submissionMode);
        this.deliveryMode = deliveryMode;
        this.submissionMode = submissionMode;
        this.producersMapCardinality = i;
        this.publishLatencyMetrics = z;
        this.batchToEmit = new ArrayList();
    }

    @DoFn.Teardown
    public void teardown() {
        SolaceWriteSessionsHandler.disconnectFromSolace(this.sessionServiceFactory, this.producersMapCardinality, this.writerTransformUuid);
    }

    public void updateProducerIndex() {
        this.currentBundleProducerIndex = bundleProducerIndexCounter.getAndIncrement() % this.producersMapCardinality;
    }

    @DoFn.StartBundle
    public void startBundle() {
        updateProducerIndex();
        this.batchToEmit.clear();
    }

    public SessionService solaceSessionServiceWithProducer() {
        return SolaceWriteSessionsHandler.getSessionServiceWithProducer(this.currentBundleProducerIndex, this.sessionServiceFactory, this.writerTransformUuid);
    }

    public void publishResults(BeamContextWrapper beamContextWrapper) {
        long j = 0;
        long j2 = 0;
        long j3 = Long.MAX_VALUE;
        long j4 = 0;
        long j5 = 0;
        long j6 = 0;
        long j7 = Long.MAX_VALUE;
        long j8 = 0;
        Queue<Solace.PublishResult> publishedResultsQueue = solaceSessionServiceWithProducer().getPublishedResultsQueue();
        Solace.PublishResult poll = publishedResultsQueue.poll();
        if (poll != null && getCurrentBundleTimestamp() == null) {
            setCurrentBundleTimestamp(Instant.now());
        }
        while (poll != null) {
            Long latencyNanos = poll.getLatencyNanos();
            if (latencyNanos == null && shouldPublishLatencyMetrics()) {
                LOG.error("SolaceIO.Write: Latency is null but user asked for latency metrics. This may be a bug.");
            }
            if (latencyNanos != null) {
                if (poll.getPublished().booleanValue()) {
                    j += latencyNanos.longValue();
                    j2++;
                    j3 = Math.min(j3, latencyNanos.longValue());
                    j4 = Math.max(j4, latencyNanos.longValue());
                } else {
                    j5 += latencyNanos.longValue();
                    j6++;
                    j7 = Math.min(j7, latencyNanos.longValue());
                    j8 = Math.max(j8, latencyNanos.longValue());
                }
            }
            if (poll.getPublished().booleanValue()) {
                beamContextWrapper.output(SolaceIO.Write.SUCCESSFUL_PUBLISH_TAG, poll, getCurrentBundleTimestamp(), GlobalWindow.INSTANCE);
            } else {
                try {
                    beamContextWrapper.output(SolaceIO.Write.FAILED_PUBLISH_TAG, BadRecord.fromExceptionInformation(poll, (Coder) null, (Exception) null, (String) Optional.ofNullable(poll.getError()).orElse("SolaceIO.Write: unknown error.")), getCurrentBundleTimestamp(), GlobalWindow.INSTANCE);
                } catch (IOException e) {
                }
            }
            poll = publishedResultsQueue.poll();
        }
        if (shouldPublishLatencyMetrics()) {
            if (j2 > 0) {
                getPublishLatencyMetric().update(TimeUnit.NANOSECONDS.toMillis(j), j2, TimeUnit.NANOSECONDS.toMillis(j3), TimeUnit.NANOSECONDS.toMillis(j4));
            }
            if (j6 > 0) {
                getFailedLatencyMetric().update(TimeUnit.NANOSECONDS.toMillis(j5), j6, TimeUnit.NANOSECONDS.toMillis(j7), TimeUnit.NANOSECONDS.toMillis(j8));
            }
        }
    }

    public BytesXMLMessage createSingleMessage(Solace.Record record, boolean z) {
        BytesXMLMessage createBytesXMLMessage = JCSMPFactory.onlyInstance().createBytesXMLMessage();
        createBytesXMLMessage.writeBytes(record.getPayload());
        Long senderTimestamp = record.getSenderTimestamp();
        if (senderTimestamp == null) {
            LOG.error("SolaceIO.Write: Record with id {} has no sender timestamp. Using current worker clock as timestamp.", record.getMessageId());
            senderTimestamp = Long.valueOf(System.currentTimeMillis());
        }
        createBytesXMLMessage.setSenderTimestamp(senderTimestamp.longValue());
        createBytesXMLMessage.setDeliveryMode(getDeliveryMode());
        if (z) {
            createBytesXMLMessage.setCorrelationKey(Solace.CorrelationKey.builder().setMessageId(record.getMessageId()).setPublishMonotonicNanos(System.nanoTime()).build());
        } else {
            createBytesXMLMessage.setCorrelationKey(record.getMessageId());
        }
        createBytesXMLMessage.setApplicationMessageId(record.getMessageId());
        return createBytesXMLMessage;
    }

    public JCSMPSendMultipleEntry[] createMessagesArray(Iterable<Solace.Record> iterable, boolean z) {
        ArrayList newArrayList = Lists.newArrayList(iterable);
        if (newArrayList.size() > SOLACE_BATCH_LIMIT) {
            LOG.error("SolaceIO.Write: Trying to create a batch of {}, but Solace supports a maximum of {}. The batch will likely be rejected by Solace.", Integer.valueOf(newArrayList.size()), Integer.valueOf(SOLACE_BATCH_LIMIT));
        }
        JCSMPSendMultipleEntry[] jCSMPSendMultipleEntryArr = new JCSMPSendMultipleEntry[newArrayList.size()];
        for (int i = 0; i < newArrayList.size(); i++) {
            Solace.Record record = (Solace.Record) newArrayList.get(i);
            jCSMPSendMultipleEntryArr[i] = JCSMPFactory.onlyInstance().createSendMultipleEntry(createSingleMessage(record, z), (Destination) getDestinationFn().apply(record));
        }
        return jCSMPSendMultipleEntryArr;
    }

    public int getProducersMapCardinality() {
        return this.producersMapCardinality;
    }

    public Distribution getPublishLatencyMetric() {
        return this.latencyPublish;
    }

    public Distribution getFailedLatencyMetric() {
        return this.latencyErrors;
    }

    public boolean shouldPublishLatencyMetrics() {
        return this.publishLatencyMetrics;
    }

    public SerializableFunction<Solace.Record, Destination> getDestinationFn() {
        return this.destinationFn;
    }

    public DeliveryMode getDeliveryMode() {
        return this.deliveryMode;
    }

    public SolaceIO.SubmissionMode getSubmissionMode() {
        return this.submissionMode;
    }

    public void addToCurrentBundle(Solace.Record record) {
        this.batchToEmit.add(record);
    }

    public List<Solace.Record> getCurrentBundle() {
        return this.batchToEmit;
    }

    public Instant getCurrentBundleTimestamp() {
        return this.bundleTimestamp;
    }

    public void setCurrentBundleTimestamp(Instant instant) {
        if (this.bundleTimestamp == null || instant.isBefore(this.bundleTimestamp)) {
            this.bundleTimestamp = instant;
        }
    }
}
