package org.apache.beam.sdk.io.jms;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.jms.AutoValue_JmsIO_Read;
import org.apache.beam.sdk.io.jms.AutoValue_JmsIO_Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO.class */
public class JmsIO {

    @FunctionalInterface
    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$MessageMapper.class */
    public interface MessageMapper<T> extends Serializable {
        T mapMessage(Message message) throws Exception;
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$Read.class */
    public static abstract class Read<T> extends PTransform<PBegin, PCollection<T>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$Read$Builder.class */
        public static abstract class Builder<T> {
            abstract Builder<T> setConnectionFactory(ConnectionFactory connectionFactory);

            abstract Builder<T> setQueue(String str);

            abstract Builder<T> setTopic(String str);

            abstract Builder<T> setUsername(String str);

            abstract Builder<T> setPassword(String str);

            abstract Builder<T> setMaxNumRecords(long j);

            abstract Builder<T> setMaxReadTime(Duration duration);

            abstract Builder<T> setMessageMapper(MessageMapper<T> messageMapper);

            abstract Builder<T> setCoder(Coder<T> coder);

            abstract Read<T> build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ConnectionFactory getConnectionFactory();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getQueue();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getTopic();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getUsername();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getPassword();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long getMaxNumRecords();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration getMaxReadTime();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract MessageMapper<T> getMessageMapper();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Coder<T> getCoder();

        abstract Builder<T> builder();

        public Read<T> withConnectionFactory(ConnectionFactory connectionFactory) {
            Preconditions.checkArgument(connectionFactory != null, "connectionFactory can not be null");
            return builder().setConnectionFactory(connectionFactory).build();
        }

        public Read<T> withQueue(String str) {
            Preconditions.checkArgument(str != null, "queue can not be null");
            return builder().setQueue(str).build();
        }

        public Read<T> withTopic(String str) {
            Preconditions.checkArgument(str != null, "topic can not be null");
            return builder().setTopic(str).build();
        }

        public Read<T> withUsername(String str) {
            Preconditions.checkArgument(str != null, "username can not be null");
            return builder().setUsername(str).build();
        }

        public Read<T> withPassword(String str) {
            Preconditions.checkArgument(str != null, "password can not be null");
            return builder().setPassword(str).build();
        }

        public Read<T> withMaxNumRecords(long j) {
            Preconditions.checkArgument(j >= 0, "maxNumRecords must be > 0, but was: %s", j);
            return builder().setMaxNumRecords(j).build();
        }

        public Read<T> withMaxReadTime(Duration duration) {
            Preconditions.checkArgument(duration != null, "maxReadTime can not be null");
            return builder().setMaxReadTime(duration).build();
        }

        public Read<T> withMessageMapper(MessageMapper<T> messageMapper) {
            Preconditions.checkArgument(messageMapper != null, "messageMapper can not be null");
            return builder().setMessageMapper(messageMapper).build();
        }

        public Read<T> withCoder(Coder<T> coder) {
            Preconditions.checkArgument(coder != null, "coder can not be null");
            return builder().setCoder(coder).build();
        }

        public PCollection<T> expand(PBegin pBegin) {
            Preconditions.checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required");
            Preconditions.checkArgument((getQueue() == null && getTopic() == null) ? false : true, "Either withQueue() or withTopic() is required");
            Preconditions.checkArgument(getQueue() == null || getTopic() == null, "withQueue() and withTopic() are exclusive");
            Preconditions.checkArgument(getMessageMapper() != null, "withMessageMapper() is required");
            Preconditions.checkArgument(getCoder() != null, "withCoder() is required");
            PTransform from = org.apache.beam.sdk.io.Read.from(createSource());
            PTransform pTransform = from;
            if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
                pTransform = from.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
            }
            return pBegin.getPipeline().apply(pTransform);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item("queue", getQueue()));
            builder.addIfNotNull(DisplayData.item("topic", getTopic()));
        }

        UnboundedSource<T, JmsCheckpointMark> createSource() {
            return new UnboundedJmsSource(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$UnboundedJmsReader.class */
    public static class UnboundedJmsReader<T> extends UnboundedSource.UnboundedReader<T> {
        private UnboundedJmsSource<T> source;
        private JmsCheckpointMark checkpointMark;
        private Connection connection;
        private Session session;
        private MessageConsumer consumer;
        private T currentMessage;
        private Instant currentTimestamp;

        public UnboundedJmsReader(UnboundedJmsSource<T> unboundedJmsSource, JmsCheckpointMark jmsCheckpointMark) {
            this.source = unboundedJmsSource;
            if (jmsCheckpointMark != null) {
                this.checkpointMark = jmsCheckpointMark;
            } else {
                this.checkpointMark = new JmsCheckpointMark();
            }
            this.currentMessage = null;
        }

        public boolean start() throws IOException {
            Read read = ((UnboundedJmsSource) this.source).spec;
            ConnectionFactory connectionFactory = read.getConnectionFactory();
            try {
                Connection createConnection = read.getUsername() != null ? connectionFactory.createConnection(read.getUsername(), read.getPassword()) : connectionFactory.createConnection();
                createConnection.start();
                this.connection = createConnection;
                try {
                    this.session = this.connection.createSession(false, 2);
                    try {
                        if (read.getTopic() != null) {
                            this.consumer = this.session.createConsumer(this.session.createTopic(read.getTopic()));
                        } else {
                            this.consumer = this.session.createConsumer(this.session.createQueue(read.getQueue()));
                        }
                        return advance();
                    } catch (Exception e) {
                        throw new IOException("Error creating JMS consumer", e);
                    }
                } catch (Exception e2) {
                    throw new IOException("Error creating JMS session", e2);
                }
            } catch (Exception e3) {
                throw new IOException("Error connecting to JMS", e3);
            }
        }

        public boolean advance() throws IOException {
            try {
                Message receiveNoWait = this.consumer.receiveNoWait();
                if (receiveNoWait == null) {
                    this.currentMessage = null;
                    return false;
                }
                this.checkpointMark.add(receiveNoWait);
                this.currentMessage = ((UnboundedJmsSource) this.source).spec.getMessageMapper().mapMessage(receiveNoWait);
                this.currentTimestamp = new Instant(receiveNoWait.getJMSTimestamp());
                return true;
            } catch (Exception e) {
                throw new IOException(e);
            }
        }

        public T getCurrent() throws NoSuchElementException {
            if (this.currentMessage == null) {
                throw new NoSuchElementException();
            }
            return this.currentMessage;
        }

        public Instant getWatermark() {
            return this.checkpointMark.getOldestMessageTimestamp();
        }

        public Instant getCurrentTimestamp() {
            if (this.currentMessage == null) {
                throw new NoSuchElementException();
            }
            return this.currentTimestamp;
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            return this.checkpointMark;
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
        public UnboundedSource<T, ?> m1getCurrentSource() {
            return this.source;
        }

        public void close() throws IOException {
            try {
                if (this.consumer != null) {
                    this.consumer.close();
                    this.consumer = null;
                }
                if (this.session != null) {
                    this.session.close();
                    this.session = null;
                }
                if (this.connection != null) {
                    this.connection.stop();
                    this.connection.close();
                    this.connection = null;
                }
            } catch (Exception e) {
                throw new IOException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$UnboundedJmsSource.class */
    public static class UnboundedJmsSource<T> extends UnboundedSource<T, JmsCheckpointMark> {
        private final Read<T> spec;

        public UnboundedJmsSource(Read<T> read) {
            this.spec = read;
        }

        public List<UnboundedJmsSource<T>> split(int i, PipelineOptions pipelineOptions) throws Exception {
            ArrayList arrayList = new ArrayList();
            if (this.spec.getTopic() != null) {
                arrayList.add(new UnboundedJmsSource(this.spec));
            } else {
                for (int i2 = 0; i2 < i; i2++) {
                    arrayList.add(new UnboundedJmsSource(this.spec));
                }
            }
            return arrayList;
        }

        public UnboundedJmsReader<T> createReader(PipelineOptions pipelineOptions, JmsCheckpointMark jmsCheckpointMark) {
            return new UnboundedJmsReader<>(this, jmsCheckpointMark);
        }

        public Coder<JmsCheckpointMark> getCheckpointMarkCoder() {
            return SerializableCoder.of(JmsCheckpointMark.class);
        }

        public Coder<T> getOutputCoder() {
            return this.spec.getCoder();
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$Write.class */
    public static abstract class Write extends PTransform<PCollection<String>, PDone> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$Write$Builder.class */
        public static abstract class Builder {
            abstract Builder setConnectionFactory(ConnectionFactory connectionFactory);

            abstract Builder setQueue(String str);

            abstract Builder setTopic(String str);

            abstract Builder setUsername(String str);

            abstract Builder setPassword(String str);

            abstract Write build();
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIO$Write$WriterFn.class */
        public static class WriterFn extends DoFn<String, Void> {
            private Write spec;
            private Connection connection;
            private Session session;
            private MessageProducer producer;

            public WriterFn(Write write) {
                this.spec = write;
            }

            @DoFn.Setup
            public void setup() throws Exception {
                if (this.producer == null) {
                    if (this.spec.getUsername() != null) {
                        this.connection = this.spec.getConnectionFactory().createConnection(this.spec.getUsername(), this.spec.getPassword());
                    } else {
                        this.connection = this.spec.getConnectionFactory().createConnection();
                    }
                    this.connection.start();
                    this.session = this.connection.createSession(false, 1);
                    this.producer = this.session.createProducer(this.spec.getQueue() != null ? this.session.createQueue(this.spec.getQueue()) : this.session.createTopic(this.spec.getTopic()));
                }
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<String, Void>.ProcessContext processContext) throws Exception {
                this.producer.send(this.session.createTextMessage((String) processContext.element()));
            }

            @DoFn.Teardown
            public void teardown() throws Exception {
                this.producer.close();
                this.producer = null;
                this.session.close();
                this.session = null;
                this.connection.stop();
                this.connection.close();
                this.connection = null;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ConnectionFactory getConnectionFactory();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getQueue();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getTopic();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getUsername();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getPassword();

        abstract Builder builder();

        public Write withConnectionFactory(ConnectionFactory connectionFactory) {
            Preconditions.checkArgument(connectionFactory != null, "connectionFactory can not be null");
            return builder().setConnectionFactory(connectionFactory).build();
        }

        public Write withQueue(String str) {
            Preconditions.checkArgument(str != null, "queue can not be null");
            return builder().setQueue(str).build();
        }

        public Write withTopic(String str) {
            Preconditions.checkArgument(str != null, "topic can not be null");
            return builder().setTopic(str).build();
        }

        public Write withUsername(String str) {
            Preconditions.checkArgument(str != null, "username can not be null");
            return builder().setUsername(str).build();
        }

        public Write withPassword(String str) {
            Preconditions.checkArgument(str != null, "password can not be null");
            return builder().setPassword(str).build();
        }

        public PDone expand(PCollection<String> pCollection) {
            Preconditions.checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required");
            Preconditions.checkArgument((getQueue() == null && getTopic() == null) ? false : true, "Either withQueue(queue) or withTopic(topic) is required");
            Preconditions.checkArgument(getQueue() == null || getTopic() == null, "withQueue(queue) and withTopic(topic) are exclusive");
            pCollection.apply(ParDo.of(new WriterFn(this)));
            return PDone.in(pCollection.getPipeline());
        }
    }

    public static Read<JmsRecord> read() {
        return new AutoValue_JmsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).setCoder(SerializableCoder.of(JmsRecord.class)).setMessageMapper(new MessageMapper<JmsRecord>() { // from class: org.apache.beam.sdk.io.jms.JmsIO.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.beam.sdk.io.jms.JmsIO.MessageMapper
            public JmsRecord mapMessage(Message message) throws Exception {
                TextMessage textMessage = (TextMessage) message;
                HashMap hashMap = new HashMap();
                Enumeration propertyNames = textMessage.getPropertyNames();
                while (propertyNames.hasMoreElements()) {
                    String str = (String) propertyNames.nextElement();
                    hashMap.put(str, textMessage.getObjectProperty(str));
                }
                return new JmsRecord(textMessage.getJMSMessageID(), textMessage.getJMSTimestamp(), textMessage.getJMSCorrelationID(), textMessage.getJMSReplyTo(), textMessage.getJMSDestination(), textMessage.getJMSDeliveryMode(), textMessage.getJMSRedelivered(), textMessage.getJMSType(), textMessage.getJMSExpiration(), textMessage.getJMSPriority(), hashMap, textMessage.getText());
            }
        }).build();
    }

    public static <T> Read<T> readMessage() {
        return new AutoValue_JmsIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build();
    }

    public static Write write() {
        return new AutoValue_JmsIO_Write.Builder().build();
    }

    private JmsIO() {
    }
}
