package org.apache.beam.sdk.io.jms;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.jms.AutoValue_JmsIO_Read;
import org.apache.beam.sdk.io.jms.AutoValue_JmsIO_Write;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO.class */
public class JmsIO {
    private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);
    private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.millis(60000);

    @FunctionalInterface
    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$MessageMapper.class */
    public interface MessageMapper<T> extends Serializable {
        T mapMessage(Message message) throws Exception;
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$Read.class */
    public static abstract class Read<T> extends PTransform<PBegin, PCollection<T>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$Read$Builder.class */
        public static abstract class Builder<T> {
            abstract Builder<T> setConnectionFactory(ConnectionFactory connectionFactory);

            abstract Builder<T> setQueue(String str);

            abstract Builder<T> setTopic(String str);

            abstract Builder<T> setUsername(String str);

            abstract Builder<T> setPassword(String str);

            abstract Builder<T> setMaxNumRecords(long j);

            abstract Builder<T> setMaxReadTime(Duration duration);

            abstract Builder<T> setMessageMapper(MessageMapper<T> messageMapper);

            abstract Builder<T> setCoder(Coder<T> coder);

            abstract Builder<T> setAutoScaler(AutoScaler autoScaler);

            abstract Builder<T> setCloseTimeout(Duration duration);

            abstract Read<T> build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ConnectionFactory getConnectionFactory();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getQueue();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getTopic();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getUsername();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getPassword();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long getMaxNumRecords();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration getMaxReadTime();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract MessageMapper<T> getMessageMapper();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Coder<T> getCoder();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract AutoScaler getAutoScaler();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration getCloseTimeout();

        abstract Builder<T> builder();

        public Read<T> withConnectionFactory(ConnectionFactory connectionFactory) {
            Preconditions.checkArgument(connectionFactory != null, "connectionFactory can not be null");
            return builder().setConnectionFactory(connectionFactory).build();
        }

        public Read<T> withQueue(String str) {
            Preconditions.checkArgument(str != null, "queue can not be null");
            return builder().setQueue(str).build();
        }

        public Read<T> withTopic(String str) {
            Preconditions.checkArgument(str != null, "topic can not be null");
            return builder().setTopic(str).build();
        }

        public Read<T> withUsername(String str) {
            Preconditions.checkArgument(str != null, "username can not be null");
            return builder().setUsername(str).build();
        }

        public Read<T> withPassword(String str) {
            Preconditions.checkArgument(str != null, "password can not be null");
            return builder().setPassword(str).build();
        }

        public Read<T> withMaxNumRecords(long j) {
            Preconditions.checkArgument(j >= 0, "maxNumRecords must be > 0, but was: %s", j);
            return builder().setMaxNumRecords(j).build();
        }

        public Read<T> withMaxReadTime(Duration duration) {
            Preconditions.checkArgument(duration != null, "maxReadTime can not be null");
            return builder().setMaxReadTime(duration).build();
        }

        public Read<T> withMessageMapper(MessageMapper<T> messageMapper) {
            Preconditions.checkArgument(messageMapper != null, "messageMapper can not be null");
            return builder().setMessageMapper(messageMapper).build();
        }

        public Read<T> withCoder(Coder<T> coder) {
            Preconditions.checkArgument(coder != null, "coder can not be null");
            return builder().setCoder(coder).build();
        }

        public Read<T> withAutoScaler(AutoScaler autoScaler) {
            Preconditions.checkArgument(autoScaler != null, "autoScaler can not be null");
            return builder().setAutoScaler(autoScaler).build();
        }

        public Read<T> withCloseTimeout(Duration duration) {
            Preconditions.checkArgument(duration != null, "closeTimeout can not be null");
            Preconditions.checkArgument(duration.getMillis() >= 0, "Close timeout must be non-negative.");
            return builder().setCloseTimeout(duration).build();
        }

        public PCollection<T> expand(PBegin pBegin) {
            Preconditions.checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required");
            Preconditions.checkArgument((getQueue() == null && getTopic() == null) ? false : true, "Either withQueue() or withTopic() is required");
            Preconditions.checkArgument(getQueue() == null || getTopic() == null, "withQueue() and withTopic() are exclusive");
            Preconditions.checkArgument(getMessageMapper() != null, "withMessageMapper() is required");
            Preconditions.checkArgument(getCoder() != null, "withCoder() is required");
            PTransform from = org.apache.beam.sdk.io.Read.from(createSource());
            PTransform pTransform = from;
            if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
                pTransform = from.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
            }
            return pBegin.getPipeline().apply(pTransform);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item("queue", getQueue()));
            builder.addIfNotNull(DisplayData.item("topic", getTopic()));
        }

        UnboundedSource<T, JmsCheckpointMark> createSource() {
            return new UnboundedJmsSource(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$UnboundedJmsReader.class */
    public static class UnboundedJmsReader<T> extends UnboundedSource.UnboundedReader<T> {
        private UnboundedJmsSource<T> source;
        private Connection connection;
        private Session session;
        private MessageConsumer consumer;
        private AutoScaler autoScaler;
        private Instant currentTimestamp;
        private PipelineOptions options;
        private JmsCheckpointMark checkpointMark = new JmsCheckpointMark();
        private T currentMessage = null;

        public UnboundedJmsReader(UnboundedJmsSource<T> unboundedJmsSource, PipelineOptions pipelineOptions) {
            this.source = unboundedJmsSource;
            this.options = pipelineOptions;
        }

        public boolean start() throws IOException {
            Read read = ((UnboundedJmsSource) this.source).spec;
            ConnectionFactory connectionFactory = read.getConnectionFactory();
            try {
                Connection createConnection = read.getUsername() != null ? connectionFactory.createConnection(read.getUsername(), read.getPassword()) : connectionFactory.createConnection();
                createConnection.start();
                this.connection = createConnection;
                if (read.getAutoScaler() == null) {
                    this.autoScaler = new DefaultAutoscaler();
                } else {
                    this.autoScaler = read.getAutoScaler();
                }
                this.autoScaler.start();
                try {
                    this.session = this.connection.createSession(false, 2);
                    try {
                        if (read.getTopic() != null) {
                            this.consumer = this.session.createConsumer(this.session.createTopic(read.getTopic()));
                        } else {
                            this.consumer = this.session.createConsumer(this.session.createQueue(read.getQueue()));
                        }
                        return advance();
                    } catch (Exception e) {
                        throw new IOException("Error creating JMS consumer", e);
                    }
                } catch (Exception e2) {
                    throw new IOException("Error creating JMS session", e2);
                }
            } catch (Exception e3) {
                throw new IOException("Error connecting to JMS", e3);
            }
        }

        public boolean advance() throws IOException {
            try {
                Message receiveNoWait = this.consumer.receiveNoWait();
                if (receiveNoWait == null) {
                    this.currentMessage = null;
                    return false;
                }
                this.checkpointMark.add(receiveNoWait);
                this.currentMessage = ((UnboundedJmsSource) this.source).spec.getMessageMapper().mapMessage(receiveNoWait);
                this.currentTimestamp = new Instant(receiveNoWait.getJMSTimestamp());
                return true;
            } catch (Exception e) {
                throw new IOException(e);
            }
        }

        public T getCurrent() throws NoSuchElementException {
            if (this.currentMessage == null) {
                throw new NoSuchElementException();
            }
            return this.currentMessage;
        }

        public Instant getWatermark() {
            return this.checkpointMark.getOldestMessageTimestamp();
        }

        public Instant getCurrentTimestamp() {
            if (this.currentMessage == null) {
                throw new NoSuchElementException();
            }
            return this.currentTimestamp;
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            return this.checkpointMark;
        }

        public long getTotalBacklogBytes() {
            return this.autoScaler.getTotalBacklogBytes();
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
        public UnboundedSource<T, ?> m2getCurrentSource() {
            return this.source;
        }

        public void close() {
            doClose();
        }

        private void doClose() {
            try {
                closeAutoscaler();
                closeConsumer();
                this.options.as(ExecutorOptions.class).getScheduledExecutorService().schedule(() -> {
                    JmsIO.LOG.debug("Closing session and connection after delay {}", ((UnboundedJmsSource) this.source).spec.getCloseTimeout());
                    this.checkpointMark.discard();
                    closeSession();
                    closeConnection();
                }, ((UnboundedJmsSource) this.source).spec.getCloseTimeout().getMillis(), TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                JmsIO.LOG.error("Error closing reader", e);
            }
        }

        private void closeConnection() {
            try {
                if (this.connection != null) {
                    this.connection.stop();
                    this.connection.close();
                    this.connection = null;
                }
            } catch (Exception e) {
                JmsIO.LOG.error("Error closing connection", e);
            }
        }

        private void closeSession() {
            try {
                if (this.session != null) {
                    this.session.close();
                    this.session = null;
                }
            } catch (Exception e) {
                JmsIO.LOG.error("Error closing session" + e.getMessage(), e);
            }
        }

        private void closeConsumer() {
            try {
                if (this.consumer != null) {
                    this.consumer.close();
                    this.consumer = null;
                }
            } catch (Exception e) {
                JmsIO.LOG.error("Error closing consumer", e);
            }
        }

        private void closeAutoscaler() {
            try {
                if (this.autoScaler != null) {
                    this.autoScaler.stop();
                    this.autoScaler = null;
                }
            } catch (Exception e) {
                JmsIO.LOG.error("Error closing autoscaler", e);
            }
        }

        protected void finalize() {
            doClose();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$UnboundedJmsSource.class */
    public static class UnboundedJmsSource<T> extends UnboundedSource<T, JmsCheckpointMark> {
        private final Read<T> spec;

        public UnboundedJmsSource(Read<T> read) {
            this.spec = read;
        }

        public List<UnboundedJmsSource<T>> split(int i, PipelineOptions pipelineOptions) throws Exception {
            ArrayList arrayList = new ArrayList();
            if (this.spec.getTopic() != null) {
                arrayList.add(new UnboundedJmsSource(this.spec));
            } else {
                for (int i2 = 0; i2 < i; i2++) {
                    arrayList.add(new UnboundedJmsSource(this.spec));
                }
            }
            return arrayList;
        }

        public UnboundedJmsReader<T> createReader(PipelineOptions pipelineOptions, JmsCheckpointMark jmsCheckpointMark) {
            return new UnboundedJmsReader<>(this, pipelineOptions);
        }

        public Coder<JmsCheckpointMark> getCheckpointMarkCoder() {
            return SerializableCoder.of(JmsCheckpointMark.class);
        }

        public Coder<T> getOutputCoder() {
            return this.spec.getCoder();
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$Write.class */
    public static abstract class Write<EventT> extends PTransform<PCollection<EventT>, WriteJmsResult<EventT>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$Write$Builder.class */
        public static abstract class Builder<EventT> {
            abstract Builder<EventT> setConnectionFactory(ConnectionFactory connectionFactory);

            abstract Builder<EventT> setQueue(String str);

            abstract Builder<EventT> setTopic(String str);

            abstract Builder<EventT> setUsername(String str);

            abstract Builder<EventT> setPassword(String str);

            abstract Builder<EventT> setValueMapper(SerializableBiFunction<EventT, Session, Message> serializableBiFunction);

            abstract Builder<EventT> setTopicNameMapper(SerializableFunction<EventT, String> serializableFunction);

            abstract Builder<EventT> setRetryConfiguration(RetryConfiguration retryConfiguration);

            abstract Write<EventT> build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ConnectionFactory getConnectionFactory();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getQueue();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getTopic();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getUsername();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getPassword();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SerializableBiFunction<EventT, Session, Message> getValueMapper();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SerializableFunction<EventT, String> getTopicNameMapper();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract RetryConfiguration getRetryConfiguration();

        abstract Builder<EventT> builder();

        public Write<EventT> withConnectionFactory(ConnectionFactory connectionFactory) {
            Preconditions.checkArgument(connectionFactory != null, "connectionFactory can not be null");
            return builder().setConnectionFactory(connectionFactory).build();
        }

        public Write<EventT> withQueue(String str) {
            Preconditions.checkArgument(str != null, "queue can not be null");
            return builder().setQueue(str).build();
        }

        public Write<EventT> withTopic(String str) {
            Preconditions.checkArgument(str != null, "topic can not be null");
            return builder().setTopic(str).build();
        }

        public Write<EventT> withUsername(String str) {
            Preconditions.checkArgument(str != null, "username can not be null");
            return builder().setUsername(str).build();
        }

        public Write<EventT> withPassword(String str) {
            Preconditions.checkArgument(str != null, "password can not be null");
            return builder().setPassword(str).build();
        }

        public Write<EventT> withTopicNameMapper(SerializableFunction<EventT, String> serializableFunction) {
            Preconditions.checkArgument(serializableFunction != null, "topicNameMapper can not be null");
            return builder().setTopicNameMapper(serializableFunction).build();
        }

        public Write<EventT> withValueMapper(SerializableBiFunction<EventT, Session, Message> serializableBiFunction) {
            Preconditions.checkArgument(serializableBiFunction != null, "valueMapper can not be null");
            return builder().setValueMapper(serializableBiFunction).build();
        }

        public Write<EventT> withRetryConfiguration(RetryConfiguration retryConfiguration) {
            Preconditions.checkArgument(retryConfiguration != null, "retryConfiguration can not be null");
            return builder().setRetryConfiguration(retryConfiguration).build();
        }

        public WriteJmsResult<EventT> expand(PCollection<EventT> pCollection) {
            Preconditions.checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required");
            Preconditions.checkArgument((getTopicNameMapper() == null && getQueue() == null && getTopic() == null) ? false : true, "Either withTopicNameMapper(topicNameMapper), withQueue(queue), or withTopic(topic) is required");
            Preconditions.checkArgument(isExclusiveTopicQueue(), "Only one of withQueue(queue), withTopic(topic), or withTopicNameMapper(function) must be set.");
            Preconditions.checkArgument(getValueMapper() != null, "withValueMapper() is required");
            return (WriteJmsResult) pCollection.apply(new Writer(this));
        }

        private boolean isExclusiveTopicQueue() {
            Boolean[] boolArr = new Boolean[3];
            boolArr[0] = Boolean.valueOf(getQueue() != null);
            boolArr[1] = Boolean.valueOf(getTopic() != null);
            boolArr[2] = Boolean.valueOf(getTopicNameMapper() != null);
            return Stream.of((Object[]) boolArr).filter(bool -> {
                return bool.booleanValue();
            }).count() == 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$Writer.class */
    public static class Writer<T> extends PTransform<PCollection<T>, WriteJmsResult<T>> {
        public static final String CONNECTION_ERRORS_METRIC_NAME = "connectionErrors";
        public static final String PUBLICATION_RETRIES_METRIC_NAME = "publicationRetries";
        public static final String JMS_IO_PRODUCER_METRIC_NAME = Writer.class.getCanonicalName();
        private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
        private static final String PUBLISH_TO_JMS_STEP_NAME = "Publish to JMS";
        private final Write<T> spec;
        private final TupleTag<T> messagesTag = new TupleTag<>();
        private final TupleTag<T> failedMessagesTag = new TupleTag<>();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$Writer$JmsConnection.class */
        public static class JmsConnection<T> implements Serializable {
            private static final long serialVersionUID = 1;
            private transient Session session;
            private transient Connection connection;
            private transient Destination destination;
            private transient MessageProducer producer;
            private final Write<T> spec;
            private final Counter connectionErrors = Metrics.counter(Writer.JMS_IO_PRODUCER_METRIC_NAME, Writer.CONNECTION_ERRORS_METRIC_NAME);

            JmsConnection(Write<T> write) {
                this.spec = write;
            }

            void connect() throws JMSException {
                if (this.producer == null) {
                    ConnectionFactory connectionFactory = this.spec.getConnectionFactory();
                    if (this.spec.getUsername() != null) {
                        this.connection = connectionFactory.createConnection(this.spec.getUsername(), this.spec.getPassword());
                    } else {
                        this.connection = connectionFactory.createConnection();
                    }
                    this.connection.setExceptionListener(jMSException -> {
                        this.connectionErrors.inc();
                    });
                    this.connection.start();
                    this.session = this.connection.createSession(false, 1);
                    if (this.spec.getQueue() != null) {
                        this.destination = this.session.createQueue(this.spec.getQueue());
                    } else if (this.spec.getTopic() != null) {
                        this.destination = this.session.createTopic(this.spec.getTopic());
                    }
                    startProducer();
                }
            }

            void publishMessage(T t) throws JMSException, JmsIOException {
                Topic topic = this.destination;
                try {
                    Message message = (Message) this.spec.getValueMapper().apply(t, this.session);
                    if (this.spec.getTopicNameMapper() != null) {
                        topic = this.session.createTopic((String) this.spec.getTopicNameMapper().apply(t));
                    }
                    this.producer.send(topic, message);
                } catch (JMSException | NullPointerException | JmsIOException e) {
                    if (!(e instanceof NullPointerException)) {
                        throw e;
                    }
                    throw new JmsIOException("An error occurred", e);
                }
            }

            void startProducer() throws JMSException {
                this.producer = this.session.createProducer((Destination) null);
            }

            void closeProducer() throws JMSException {
                if (this.producer != null) {
                    this.producer.close();
                    this.producer = null;
                }
            }

            void close() {
                try {
                    closeProducer();
                    if (this.session != null) {
                        this.session.close();
                    }
                    if (this.connection != null) {
                        this.connection.close();
                    }
                } catch (JMSException e) {
                    Writer.LOG.warn("The connection couldn't be closed", e);
                } finally {
                    this.session = null;
                    this.connection = null;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$Writer$JmsIOProducerFn.class */
        public static class JmsIOProducerFn<T> extends DoFn<T, T> {
            private transient FluentBackoff retryBackOff;
            private final Write<T> spec;
            private final TupleTag<T> failedMessagesTags;
            private final JmsConnection<T> jmsConnection;
            private final Counter publicationRetries = Metrics.counter(Writer.JMS_IO_PRODUCER_METRIC_NAME, Writer.PUBLICATION_RETRIES_METRIC_NAME);

            JmsIOProducerFn(Write<T> write, TupleTag<T> tupleTag) {
                this.spec = write;
                this.failedMessagesTags = tupleTag;
                this.jmsConnection = new JmsConnection<>(write);
            }

            @DoFn.Setup
            public void setup() throws JMSException {
                this.jmsConnection.connect();
                RetryConfiguration retryConfiguration = (RetryConfiguration) MoreObjects.firstNonNull(this.spec.getRetryConfiguration(), RetryConfiguration.create());
                this.retryBackOff = FluentBackoff.DEFAULT.withInitialBackoff((Duration) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(retryConfiguration.getInitialDuration())).withMaxCumulativeBackoff((Duration) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(retryConfiguration.getMaxDuration())).withMaxRetries(retryConfiguration.getMaxAttempts());
            }

            @DoFn.StartBundle
            public void startBundle() throws JMSException {
                this.jmsConnection.startProducer();
            }

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element T t, DoFn<T, T>.ProcessContext processContext) {
                try {
                    publishMessage(t);
                } catch (JMSException | IOException | InterruptedException | JmsIOException e) {
                    Writer.LOG.error("Error while publishing the message", e);
                    processContext.output(this.failedMessagesTags, t);
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                }
            }

            private void publishMessage(T t) throws JMSException, JmsIOException, IOException, InterruptedException {
                Sleeper sleeper = Sleeper.DEFAULT;
                BackOff backoff = ((FluentBackoff) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(this.retryBackOff)).backoff();
                while (true) {
                    try {
                        this.jmsConnection.publishMessage(t);
                        return;
                    } catch (JMSException | JmsIOException e) {
                        if (!BackOffUtils.next(sleeper, backoff)) {
                            throw e;
                        }
                        this.publicationRetries.inc();
                    }
                }
            }

            @DoFn.FinishBundle
            public void finishBundle() throws JMSException {
                this.jmsConnection.closeProducer();
            }

            @DoFn.Teardown
            public void tearDown() {
                this.jmsConnection.close();
            }
        }

        Writer(Write<T> write) {
            this.spec = write;
        }

        public WriteJmsResult<T> expand(PCollection<T> pCollection) {
            PCollectionTuple apply = pCollection.apply(PUBLISH_TO_JMS_STEP_NAME, ParDo.of(new JmsIOProducerFn(this.spec, this.failedMessagesTag)).withOutputTags(this.messagesTag, TupleTagList.of(this.failedMessagesTag)));
            PCollection coder = apply.get(this.failedMessagesTag).setCoder(pCollection.getCoder());
            apply.get(this.messagesTag).setCoder(pCollection.getCoder());
            return WriteJmsResult.in(pCollection.getPipeline(), this.failedMessagesTag, coder);
        }
    }

    public static Read<JmsRecord> read() {
        return new AutoValue_JmsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).setCoder(SerializableCoder.of(JmsRecord.class)).setCloseTimeout(DEFAULT_CLOSE_TIMEOUT).setMessageMapper(new MessageMapper<JmsRecord>() { // from class: org.apache.beam.sdk.io.jms.JmsIO.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.beam.sdk.io.jms.JmsIO.MessageMapper
            public JmsRecord mapMessage(Message message) throws Exception {
                TextMessage textMessage = (TextMessage) message;
                HashMap hashMap = new HashMap();
                Enumeration propertyNames = textMessage.getPropertyNames();
                while (propertyNames.hasMoreElements()) {
                    String str = (String) propertyNames.nextElement();
                    hashMap.put(str, textMessage.getObjectProperty(str));
                }
                return new JmsRecord(textMessage.getJMSMessageID(), textMessage.getJMSTimestamp(), textMessage.getJMSCorrelationID(), textMessage.getJMSReplyTo(), textMessage.getJMSDestination(), textMessage.getJMSDeliveryMode(), textMessage.getJMSRedelivered(), textMessage.getJMSType(), textMessage.getJMSExpiration(), textMessage.getJMSPriority(), hashMap, textMessage.getText());
            }
        }).build();
    }

    public static <T> Read<T> readMessage() {
        return new AutoValue_JmsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).setCloseTimeout(DEFAULT_CLOSE_TIMEOUT).build();
    }

    public static <EventT> Write<EventT> write() {
        return new AutoValue_JmsIO_Write.Builder().build();
    }

    private JmsIO() {
    }
}
