package org.apache.beam.sdk.io.solace.write;

import com.solacesystems.jcsmp.DeliveryMode;
import com.solacesystems.jcsmp.Destination;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.solace.SolaceIO;
import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
import org.apache.beam.sdk.io.solace.data.Solace;
import org.apache.beam.sdk.io.solace.write.UnboundedSolaceWriter;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.class */
public final class UnboundedStreamingSolaceWriter extends UnboundedSolaceWriter {
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedStreamingSolaceWriter.class);
    private final Counter sentToBroker;
    private final Counter rejectedByBroker;

    @DoFn.StateId("current_key")
    private final StateSpec<ValueState<Integer>> currentKeySpec;

    public UnboundedStreamingSolaceWriter(SerializableFunction<Solace.Record, Destination> serializableFunction, SessionServiceFactory sessionServiceFactory, DeliveryMode deliveryMode, SolaceIO.SubmissionMode submissionMode, int i, boolean z) {
        super(serializableFunction, sessionServiceFactory, deliveryMode, submissionMode, i, z);
        this.sentToBroker = Metrics.counter(UnboundedStreamingSolaceWriter.class, "msgs_sent_to_broker");
        this.rejectedByBroker = Metrics.counter(UnboundedStreamingSolaceWriter.class, "msgs_rejected_by_broker");
        this.currentKeySpec = StateSpecs.value();
    }

    @DoFn.ProcessElement
    public void processElement(@DoFn.Element KV<Integer, Solace.Record> kv, @DoFn.Timestamp Instant instant, @DoFn.StateId("current_key") @DoFn.AlwaysFetched ValueState<Integer> valueState) {
        setCurrentBundleTimestamp(instant);
        Integer num = (Integer) valueState.read();
        Integer num2 = (Integer) kv.getKey();
        Solace.Record record = (Solace.Record) kv.getValue();
        if (num == null || !num.equals(num2)) {
            valueState.write(num2);
        }
        if (record == null) {
            LOG.error("SolaceIO.Write: Found null record with key {}. Ignoring record.", num2);
            return;
        }
        try {
            solaceSessionServiceWithProducer().getInitializedProducer(getSubmissionMode()).publishSingleMessage(record, (Destination) getDestinationFn().apply(record), shouldPublishLatencyMetrics(), getDeliveryMode());
            this.sentToBroker.inc();
        } catch (Exception e) {
            this.rejectedByBroker.inc();
            solaceSessionServiceWithProducer().getPublishedResultsQueue().add(Solace.PublishResult.builder().setPublished(false).setMessageId(record.getMessageId()).setError(String.format("Message could not be published after several retries. Error: %s", e.getMessage())).setLatencyNanos(Long.valueOf(System.nanoTime())).build());
        }
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn<KV<Integer, Solace.Record>, Solace.PublishResult>.FinishBundleContext finishBundleContext) {
        publishResults(UnboundedSolaceWriter.BeamContextWrapper.of(finishBundleContext));
    }
}
