package org.apache.beam.sdk.io.solace.read;

import com.solacesystems.jcsmp.BytesXMLMessage;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.solace.broker.SempClient;
import org.apache.beam.sdk.io.solace.broker.SessionService;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@VisibleForTesting
/* loaded from: input_file:org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.class */
public class UnboundedSolaceReader<T> extends UnboundedSource.UnboundedReader<T> {
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSolaceReader.class);
    private final UnboundedSolaceSource<T> currentSource;
    private final WatermarkPolicy<T> watermarkPolicy;
    private final SempClient sempClient;
    private BytesXMLMessage solaceOriginalRecord;
    private T solaceMappedRecord;
    private SessionService sessionService;
    AtomicBoolean active = new AtomicBoolean(true);
    private final Queue<BytesXMLMessage> elementsToCheckpoint = new ArrayDeque();

    public UnboundedSolaceReader(UnboundedSolaceSource<T> unboundedSolaceSource) {
        this.currentSource = unboundedSolaceSource;
        this.watermarkPolicy = WatermarkPolicy.create(unboundedSolaceSource.getTimestampFn(), unboundedSolaceSource.getWatermarkIdleDurationThreshold());
        this.sessionService = unboundedSolaceSource.getSessionServiceFactory().create();
        this.sempClient = unboundedSolaceSource.getSempClientFactory().create();
    }

    public boolean start() {
        populateSession();
        ((SessionService) Preconditions.checkNotNull(this.sessionService)).getReceiver().start();
        return advance();
    }

    public void populateSession() {
        if (this.sessionService == null) {
            this.sessionService = m16getCurrentSource().getSessionServiceFactory().create();
        }
        if (this.sessionService.isClosed()) {
            ((SessionService) Preconditions.checkNotNull(this.sessionService)).connect();
        }
    }

    public boolean advance() {
        try {
            BytesXMLMessage receive = ((SessionService) Preconditions.checkNotNull(this.sessionService)).getReceiver().receive();
            if (receive == null) {
                return false;
            }
            this.elementsToCheckpoint.add(receive);
            this.solaceOriginalRecord = receive;
            this.solaceMappedRecord = (T) m16getCurrentSource().getParseFn().apply(receive);
            this.watermarkPolicy.update(this.solaceMappedRecord);
            return true;
        } catch (IOException e) {
            LOG.warn("SolaceIO.Read: Exception when pulling messages from the broker.", e);
            return false;
        }
    }

    public void close() {
        this.active.set(false);
        ((SessionService) Preconditions.checkNotNull(this.sessionService)).close();
    }

    public Instant getWatermark() {
        return ((SessionService) Preconditions.checkNotNull(this.sessionService)).getReceiver().isEOF() ? BoundedWindow.TIMESTAMP_MAX_VALUE : this.watermarkPolicy.getWatermark();
    }

    public UnboundedSource.CheckpointMark getCheckpointMark() {
        ArrayList arrayList = new ArrayList();
        while (!this.elementsToCheckpoint.isEmpty()) {
            BytesXMLMessage poll = this.elementsToCheckpoint.poll();
            if (poll != null) {
                arrayList.add(poll);
            }
        }
        return new SolaceCheckpointMark(this.active, arrayList);
    }

    public T getCurrent() throws NoSuchElementException {
        if (this.solaceMappedRecord == null) {
            throw new NoSuchElementException();
        }
        return this.solaceMappedRecord;
    }

    public byte[] getCurrentRecordId() throws NoSuchElementException {
        if (this.solaceOriginalRecord == null) {
            throw new NoSuchElementException();
        }
        return this.solaceOriginalRecord.getApplicationMessageId() != null ? ((BytesXMLMessage) Preconditions.checkNotNull(this.solaceOriginalRecord)).getApplicationMessageId().getBytes(StandardCharsets.UTF_8) : ((BytesXMLMessage) Preconditions.checkNotNull(this.solaceOriginalRecord)).getReplicationGroupMessageId().toString().getBytes(StandardCharsets.UTF_8);
    }

    /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
    public UnboundedSolaceSource<T> m16getCurrentSource() {
        return this.currentSource;
    }

    public Instant getCurrentTimestamp() throws NoSuchElementException {
        if (getCurrent() == null) {
            throw new NoSuchElementException();
        }
        return (Instant) this.currentSource.getTimestampFn().apply(getCurrent());
    }

    public long getTotalBacklogBytes() {
        try {
            return this.sempClient.getBacklogBytes(this.currentSource.getQueue().getName());
        } catch (IOException e) {
            LOG.warn("SolaceIO.Read: Could not query backlog bytes. Returning BACKLOG_UNKNOWN", e);
            return -1L;
        }
    }
}
