package org.apache.beam.sdk.io.solace.write;

import com.solacesystems.jcsmp.DeliveryMode;
import com.solacesystems.jcsmp.Destination;
import java.io.IOException;
import java.util.List;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.solace.SolaceIO;
import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
import org.apache.beam.sdk.io.solace.data.Solace;
import org.apache.beam.sdk.io.solace.write.UnboundedSolaceWriter;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.class */
public final class UnboundedBatchedSolaceWriter extends UnboundedSolaceWriter {
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedBatchedSolaceWriter.class);
    private static final int ACKS_FLUSHING_INTERVAL_SECS = 10;
    private final Counter sentToBroker;
    private final Counter batchesRejectedByBroker;

    @DoFn.TimerId("bundle_flusher")
    private final TimerSpec bundleFlusherTimerSpec;

    public UnboundedBatchedSolaceWriter(SerializableFunction<Solace.Record, Destination> serializableFunction, SessionServiceFactory sessionServiceFactory, DeliveryMode deliveryMode, SolaceIO.SubmissionMode submissionMode, int i, boolean z) {
        super(serializableFunction, sessionServiceFactory, deliveryMode, submissionMode, i, z);
        this.sentToBroker = Metrics.counter(UnboundedBatchedSolaceWriter.class, "msgs_sent_to_broker");
        this.batchesRejectedByBroker = Metrics.counter(UnboundedSolaceWriter.class, "batches_rejected");
        this.bundleFlusherTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
    }

    @DoFn.ProcessElement
    public void processElement(@DoFn.Element KV<Integer, Solace.Record> kv, @DoFn.TimerId("bundle_flusher") Timer timer, @DoFn.Timestamp Instant instant) {
        setCurrentBundleTimestamp(instant);
        Solace.Record record = (Solace.Record) kv.getValue();
        if (record == null) {
            LOG.error("SolaceIO.Write: Found null record with key {}. Ignoring record.", kv.getKey());
        } else {
            addToCurrentBundle(record);
            timer.offset(Duration.standardSeconds(10L)).setRelative();
        }
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn<KV<Integer, Solace.Record>, Solace.PublishResult>.FinishBundleContext finishBundleContext) throws IOException {
        List<Solace.Record> currentBundle = getCurrentBundle();
        for (int i = 0; i < currentBundle.size(); i += 50) {
            List<Solace.Record> subList = currentBundle.subList(i, Math.min(i + 50, currentBundle.size()));
            if (!subList.isEmpty()) {
                publishBatch(subList);
            }
        }
        getCurrentBundle().clear();
        publishResults(UnboundedSolaceWriter.BeamContextWrapper.of(finishBundleContext));
    }

    @DoFn.OnTimer("bundle_flusher")
    public void flushBundle(DoFn<KV<Integer, Solace.Record>, Solace.PublishResult>.OnTimerContext onTimerContext) throws IOException {
        publishResults(UnboundedSolaceWriter.BeamContextWrapper.of((DoFn<KV<Integer, Solace.Record>, Solace.PublishResult>.WindowedContext) onTimerContext));
    }

    private void publishBatch(List<Solace.Record> list) {
        try {
            this.sentToBroker.inc(solaceSessionServiceWithProducer().getInitializedProducer(getSubmissionMode()).publishBatch(list, shouldPublishLatencyMetrics(), getDestinationFn(), getDeliveryMode()));
        } catch (Exception e) {
            this.batchesRejectedByBroker.inc();
            solaceSessionServiceWithProducer().getPublishedResultsQueue().add(Solace.PublishResult.builder().setPublished(false).setMessageId(String.format("BATCH_OF_%d_ENTRIES", Integer.valueOf(list.size()))).setError(String.format("Batch could not be published after several retries. Error: %s", e.getMessage())).setLatencyNanos(Long.valueOf(System.nanoTime())).build());
        }
    }
}
