package org.apache.beam.sdk.io.solace.broker;

import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.FlowReceiver;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.StaleSessionException;
import java.io.IOException;
import org.apache.beam.sdk.io.solace.RetryCallableManager;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.class */
public class SolaceMessageReceiver implements MessageReceiver {
    private static final Logger LOG = LoggerFactory.getLogger(SolaceMessageReceiver.class);
    public static final int DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS = 100;
    private final FlowReceiver flowReceiver;
    private final RetryCallableManager retryCallableManager = RetryCallableManager.create();

    public SolaceMessageReceiver(FlowReceiver flowReceiver) {
        this.flowReceiver = flowReceiver;
    }

    @Override // org.apache.beam.sdk.io.solace.broker.MessageReceiver
    public void start() {
        startFlowReceiver();
    }

    private void startFlowReceiver() {
        this.retryCallableManager.retryCallable(() -> {
            this.flowReceiver.start();
            return 0;
        }, ImmutableSet.of(JCSMPException.class));
    }

    @Override // org.apache.beam.sdk.io.solace.broker.MessageReceiver
    public boolean isClosed() {
        return this.flowReceiver == null || this.flowReceiver.isClosed();
    }

    @Override // org.apache.beam.sdk.io.solace.broker.MessageReceiver
    public BytesXMLMessage receive() throws IOException {
        try {
            return this.flowReceiver.receive(100);
        } catch (StaleSessionException e) {
            LOG.warn("SolaceIO: Caught StaleSessionException, restarting the FlowReceiver.");
            startFlowReceiver();
            throw new IOException((Throwable) e);
        } catch (JCSMPException e2) {
            throw new IOException((Throwable) e2);
        }
    }

    @Override // org.apache.beam.sdk.io.solace.broker.MessageReceiver
    public void close() {
        if (isClosed()) {
            return;
        }
        this.flowReceiver.close();
    }
}
