package org.apache.beam.sdk.io.solace.read;

import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.Queue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.solace.broker.SempClientFactory;
import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.class */
public class UnboundedSolaceSource<T> extends UnboundedSource<T, SolaceCheckpointMark> {
    private static final long serialVersionUID = 42;
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSolaceSource.class);
    private final Queue queue;
    private final Integer maxNumConnections;
    private final Coder<T> coder;
    private final boolean enableDeduplication;
    private final SempClientFactory sempClientFactory;
    private final SessionServiceFactory sessionServiceFactory;
    private final SerializableFunction<T, Instant> timestampFn;
    private final SerializableFunction<BytesXMLMessage, T> parseFn;

    public Queue getQueue() {
        return this.queue;
    }

    public SessionServiceFactory getSessionServiceFactory() {
        return this.sessionServiceFactory;
    }

    public SempClientFactory getSempClientFactory() {
        return this.sempClientFactory;
    }

    public SerializableFunction<T, Instant> getTimestampFn() {
        return this.timestampFn;
    }

    public SerializableFunction<BytesXMLMessage, T> getParseFn() {
        return this.parseFn;
    }

    public UnboundedSolaceSource(Queue queue, SempClientFactory sempClientFactory, SessionServiceFactory sessionServiceFactory, Integer num, boolean z, Coder<T> coder, SerializableFunction<T, Instant> serializableFunction, SerializableFunction<BytesXMLMessage, T> serializableFunction2) {
        this.queue = queue;
        this.sempClientFactory = sempClientFactory;
        this.sessionServiceFactory = sessionServiceFactory;
        this.maxNumConnections = num;
        this.enableDeduplication = z;
        this.coder = coder;
        this.timestampFn = serializableFunction;
        this.parseFn = serializableFunction2;
    }

    public UnboundedSource.UnboundedReader<T> createReader(PipelineOptions pipelineOptions, SolaceCheckpointMark solaceCheckpointMark) {
        return new UnboundedSolaceReader(this);
    }

    public List<UnboundedSolaceSource<T>> split(int i, PipelineOptions pipelineOptions) throws IOException {
        if (this.sempClientFactory.create().isQueueNonExclusive(this.queue.getName())) {
            return getSolaceSources(i, this.maxNumConnections);
        }
        LOG.warn("SolaceIO.Read: The queue {} is exclusive. Provisioning only 1 read client.", this.queue);
        return getSolaceSources(i, 1);
    }

    private List<UnboundedSolaceSource<T>> getSolaceSources(int i, Integer num) {
        ArrayList arrayList = new ArrayList();
        int min = num != null ? Math.min(i, num.intValue()) : i;
        LOG.info("SolaceIO.Read: UnboundedSolaceSource: creating {} read connections.", Integer.valueOf(min));
        for (int i2 = 0; i2 < min; i2++) {
            arrayList.add(new UnboundedSolaceSource(this.queue, this.sempClientFactory, this.sessionServiceFactory, num, this.enableDeduplication, this.coder, this.timestampFn, this.parseFn));
        }
        return arrayList;
    }

    public Coder<SolaceCheckpointMark> getCheckpointMarkCoder() {
        return AvroCoder.of(SolaceCheckpointMark.class);
    }

    public Coder<T> getOutputCoder() {
        return this.coder;
    }

    public boolean requiresDeduping() {
        return this.enableDeduplication;
    }
}
