package org.apache.beam.sdk.io.rabbitmq;

import com.google.auto.value.AutoValue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeoutException;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.rabbitmq.AutoValue_RabbitMqIO_Read;
import org.apache.beam.sdk.io.rabbitmq.AutoValue_RabbitMqIO_Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.class */
public class RabbitMqIO {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/rabbitmq/RabbitMqIO$ConnectionHandler.class */
    public static class ConnectionHandler {
        private final ConnectionFactory connectionFactory = new ConnectionFactory();
        private Connection connection;
        private Channel channel;

        public ConnectionHandler(String str) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
            this.connectionFactory.setUri(str);
            this.connectionFactory.setAutomaticRecoveryEnabled(true);
            this.connectionFactory.setConnectionTimeout(60000);
            this.connectionFactory.setNetworkRecoveryInterval(5000);
            this.connectionFactory.setRequestedHeartbeat(60);
            this.connectionFactory.setTopologyRecoveryEnabled(true);
            this.connectionFactory.setRequestedChannelMax(0);
            this.connectionFactory.setRequestedFrameMax(0);
        }

        public void start() throws TimeoutException, IOException {
            this.connection = this.connectionFactory.newConnection();
            this.channel = this.connection.createChannel();
            if (this.channel == null) {
                throw new IOException("No RabbitMQ channel available");
            }
        }

        public Channel getChannel() {
            return this.channel;
        }

        public void stop() throws IOException {
            if (this.channel != null) {
                try {
                    this.channel.close();
                } catch (Exception e) {
                }
            }
            if (this.connection != null) {
                this.connection.close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/rabbitmq/RabbitMqIO$RabbitMQCheckpointMark.class */
    public static class RabbitMQCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {
        transient Channel channel;
        Instant latestTimestamp;
        final List<Long> sessionIds;

        private RabbitMQCheckpointMark() {
            this.latestTimestamp = Instant.now();
            this.sessionIds = new ArrayList();
        }

        public void advanceWatermark(Instant instant) {
            if (instant.isAfter(this.latestTimestamp)) {
                this.latestTimestamp = instant;
            }
        }

        public void finalizeCheckpoint() throws IOException {
            Iterator<Long> it = this.sessionIds.iterator();
            while (it.hasNext()) {
                this.channel.basicAck(it.next().longValue(), false);
            }
            this.channel.txCommit();
            this.latestTimestamp = Instant.now();
            this.sessionIds.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/rabbitmq/RabbitMqIO$RabbitMQSource.class */
    public static class RabbitMQSource extends UnboundedSource<RabbitMqMessage, RabbitMQCheckpointMark> {
        final Read spec;

        RabbitMQSource(Read read) {
            this.spec = read;
        }

        public Coder<RabbitMqMessage> getOutputCoder() {
            return SerializableCoder.of(RabbitMqMessage.class);
        }

        public List<RabbitMQSource> split(int i, PipelineOptions pipelineOptions) {
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(this);
            }
            return arrayList;
        }

        public UnboundedSource.UnboundedReader<RabbitMqMessage> createReader(PipelineOptions pipelineOptions, RabbitMQCheckpointMark rabbitMQCheckpointMark) throws IOException {
            return new UnboundedRabbitMqReader(this, rabbitMQCheckpointMark);
        }

        public Coder<RabbitMQCheckpointMark> getCheckpointMarkCoder() {
            return SerializableCoder.of(RabbitMQCheckpointMark.class);
        }

        public boolean requiresDeduping() {
            return this.spec.useCorrelationId();
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/rabbitmq/RabbitMqIO$Read.class */
    public static abstract class Read extends PTransform<PBegin, PCollection<RabbitMqMessage>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/rabbitmq/RabbitMqIO$Read$Builder.class */
        public static abstract class Builder {
            abstract Builder setUri(String str);

            abstract Builder setQueue(String str);

            abstract Builder setQueueDeclare(boolean z);

            abstract Builder setExchange(String str);

            abstract Builder setExchangeType(String str);

            abstract Builder setExchangeDeclare(boolean z);

            abstract Builder setRoutingKey(String str);

            abstract Builder setUseCorrelationId(boolean z);

            abstract Builder setMaxNumRecords(long j);

            abstract Builder setMaxReadTime(Duration duration);

            abstract Read build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String uri();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String queue();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract boolean queueDeclare();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String exchange();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String exchangeType();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract boolean exchangeDeclare();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String routingKey();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract boolean useCorrelationId();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long maxNumRecords();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration maxReadTime();

        abstract Builder builder();

        public Read withUri(String str) {
            Preconditions.checkArgument(str != null, "uri can not be null");
            return builder().setUri(str).build();
        }

        public Read withQueue(String str) {
            Preconditions.checkArgument(str != null, "queue can not be null");
            return builder().setQueue(str).build();
        }

        public Read withQueueDeclare(boolean z) {
            return builder().setQueueDeclare(z).build();
        }

        public Read withExchange(String str, String str2, String str3) {
            Preconditions.checkArgument(str != null, "exchange name can not be null");
            Preconditions.checkArgument(str2 != null, "exchange type can not be null");
            return builder().setExchange(str).setExchangeType(str2).setRoutingKey(str3).setExchangeDeclare(true).build();
        }

        public Read withExchange(String str, String str2) {
            Preconditions.checkArgument(str != null, "exchange name can not be null");
            return builder().setExchange(str).setExchangeDeclare(false).setRoutingKey(str2).build();
        }

        public Read withMaxNumRecords(long j) {
            Preconditions.checkArgument(maxReadTime() == null, "maxNumRecord and maxReadTime are exclusive");
            return builder().setMaxNumRecords(j).build();
        }

        public Read withMaxReadTime(Duration duration) {
            Preconditions.checkArgument(maxNumRecords() == Long.MAX_VALUE, "maxNumRecord and maxReadTime are exclusive");
            return builder().setMaxReadTime(duration).build();
        }

        public Read withUseCorrelationId(boolean z) {
            return builder().setUseCorrelationId(z).build();
        }

        public PCollection<RabbitMqMessage> expand(PBegin pBegin) {
            PTransform from = org.apache.beam.sdk.io.Read.from(new RabbitMQSource(this));
            PTransform pTransform = from;
            if (maxNumRecords() < Long.MAX_VALUE || maxReadTime() != null) {
                pTransform = from.withMaxReadTime(maxReadTime()).withMaxNumRecords(maxNumRecords());
            }
            return pBegin.getPipeline().apply(pTransform);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/rabbitmq/RabbitMqIO$UnboundedRabbitMqReader.class */
    public static class UnboundedRabbitMqReader extends UnboundedSource.UnboundedReader<RabbitMqMessage> {
        private final RabbitMQSource source;
        private RabbitMqMessage current = null;
        private byte[] currentRecordId;
        private ConnectionHandler connectionHandler;
        private String queueName;
        private Instant currentTimestamp;
        private final RabbitMQCheckpointMark checkpointMark;

        UnboundedRabbitMqReader(RabbitMQSource rabbitMQSource, RabbitMQCheckpointMark rabbitMQCheckpointMark) throws IOException {
            this.source = rabbitMQSource;
            this.checkpointMark = rabbitMQCheckpointMark != null ? rabbitMQCheckpointMark : new RabbitMQCheckpointMark();
        }

        public Instant getWatermark() {
            return this.checkpointMark.latestTimestamp;
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            return this.checkpointMark;
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public RabbitMQSource m1getCurrentSource() {
            return this.source;
        }

        public byte[] getCurrentRecordId() {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.currentRecordId != null ? this.currentRecordId : "".getBytes(StandardCharsets.UTF_8);
        }

        public Instant getCurrentTimestamp() {
            if (this.currentTimestamp == null) {
                throw new NoSuchElementException();
            }
            return this.currentTimestamp;
        }

        /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
        public RabbitMqMessage m2getCurrent() {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current;
        }

        public boolean start() throws IOException {
            try {
                this.connectionHandler = new ConnectionHandler(this.source.spec.uri());
                this.connectionHandler.start();
                Channel channel = this.connectionHandler.getChannel();
                this.queueName = this.source.spec.queue();
                if (this.source.spec.queueDeclare()) {
                    channel.queueDeclare(this.queueName, false, false, false, (Map) null);
                }
                if (this.source.spec.exchange() != null) {
                    if (this.source.spec.exchangeDeclare()) {
                        channel.exchangeDeclare(this.source.spec.exchange(), this.source.spec.exchangeType());
                    }
                    if (this.queueName == null) {
                        this.queueName = channel.queueDeclare().getQueue();
                    }
                    channel.queueBind(this.queueName, this.source.spec.exchange(), this.source.spec.routingKey());
                }
                this.checkpointMark.channel = channel;
                channel.txSelect();
                return advance();
            } catch (IOException e) {
                throw e;
            } catch (Exception e2) {
                throw new IOException(e2);
            }
        }

        public boolean advance() throws IOException {
            try {
                GetResponse basicGet = this.connectionHandler.getChannel().basicGet(this.queueName, false);
                if (basicGet == null) {
                    this.current = null;
                    this.currentRecordId = null;
                    this.currentTimestamp = null;
                    this.checkpointMark.advanceWatermark(Instant.now());
                    return false;
                }
                if (this.source.spec.useCorrelationId()) {
                    String correlationId = basicGet.getProps().getCorrelationId();
                    if (correlationId == null) {
                        throw new IOException("RabbitMqIO.Read uses message correlation ID, but received message has a null correlation ID");
                    }
                    this.currentRecordId = correlationId.getBytes(StandardCharsets.UTF_8);
                }
                this.checkpointMark.sessionIds.add(Long.valueOf(basicGet.getEnvelope().getDeliveryTag()));
                this.current = new RabbitMqMessage(this.source.spec.routingKey(), basicGet);
                Date timestamp = basicGet.getProps().getTimestamp();
                this.currentTimestamp = timestamp != null ? new Instant(timestamp) : Instant.now();
                this.checkpointMark.advanceWatermark(this.currentTimestamp);
                return true;
            } catch (IOException e) {
                throw e;
            } catch (Exception e2) {
                throw new IOException(e2);
            }
        }

        public void close() throws IOException {
            if (this.connectionHandler != null) {
                this.connectionHandler.stop();
            }
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/rabbitmq/RabbitMqIO$Write.class */
    public static abstract class Write extends PTransform<PCollection<RabbitMqMessage>, PCollection<?>> {

        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/rabbitmq/RabbitMqIO$Write$Builder.class */
        static abstract class Builder {
            abstract Builder setUri(String str);

            abstract Builder setExchange(String str);

            abstract Builder setExchangeType(String str);

            abstract Builder setExchangeDeclare(boolean z);

            abstract Builder setQueue(String str);

            abstract Builder setQueueDeclare(boolean z);

            abstract Write build();
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/rabbitmq/RabbitMqIO$Write$WriteFn.class */
        public static class WriteFn extends DoFn<RabbitMqMessage, Void> {
            private final Write spec;
            private transient ConnectionHandler connectionHandler;

            WriteFn(Write write) {
                this.spec = write;
            }

            @DoFn.Setup
            public void setup() throws Exception {
                this.connectionHandler = new ConnectionHandler(this.spec.uri());
                this.connectionHandler.start();
                Channel channel = this.connectionHandler.getChannel();
                if (this.spec.exchange() != null && this.spec.exchangeDeclare()) {
                    channel.exchangeDeclare(this.spec.exchange(), this.spec.exchangeType());
                }
                if (this.spec.queue() == null || !this.spec.queueDeclare()) {
                    return;
                }
                channel.queueDeclare(this.spec.queue(), true, false, false, (Map) null);
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<RabbitMqMessage, Void>.ProcessContext processContext) throws IOException {
                RabbitMqMessage rabbitMqMessage = (RabbitMqMessage) processContext.element();
                Channel channel = this.connectionHandler.getChannel();
                if (this.spec.exchange() != null) {
                    channel.basicPublish(this.spec.exchange(), rabbitMqMessage.getRoutingKey(), rabbitMqMessage.createProperties(), rabbitMqMessage.getBody());
                }
                if (this.spec.queue() != null) {
                    channel.basicPublish("", this.spec.queue(), MessageProperties.PERSISTENT_TEXT_PLAIN, rabbitMqMessage.getBody());
                }
            }

            @DoFn.Teardown
            public void teardown() throws Exception {
                if (this.connectionHandler != null) {
                    this.connectionHandler.stop();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String uri();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String exchange();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String exchangeType();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract boolean exchangeDeclare();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String queue();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract boolean queueDeclare();

        abstract Builder builder();

        public Write withUri(String str) {
            Preconditions.checkArgument(str != null, "uri can not be null");
            return builder().setUri(str).build();
        }

        public Write withExchange(String str, String str2) {
            Preconditions.checkArgument(str != null, "exchange can not be null");
            Preconditions.checkArgument(str2 != null, "exchangeType can not be null");
            return builder().setExchange(str).setExchangeType(str2).setExchangeDeclare(true).build();
        }

        public Write withExchange(String str) {
            Preconditions.checkArgument(str != null, "exchange can not be null");
            return builder().setExchange(str).setExchangeDeclare(false).build();
        }

        public Write withQueue(String str) {
            Preconditions.checkArgument(str != null, "queue can not be null");
            return builder().setQueue(str).build();
        }

        public Write withQueueDeclare(boolean z) {
            return builder().setQueueDeclare(z).build();
        }

        public PCollection<?> expand(PCollection<RabbitMqMessage> pCollection) {
            Preconditions.checkArgument((exchange() == null && queue() == null) ? false : true, "Either exchange or queue has to be specified");
            if (exchange() != null) {
                Preconditions.checkArgument(queue() == null, "Queue can't be set in the same time as exchange");
            }
            if (queue() != null) {
                Preconditions.checkArgument(exchange() == null, "Exchange can't be set in the same time as queue");
            }
            if (queueDeclare()) {
                Preconditions.checkArgument(queue() != null, "Queue is required for the queue declare");
            }
            if (exchangeDeclare()) {
                Preconditions.checkArgument(exchange() != null, "Exchange is required for the exchange declare");
            }
            return pCollection.apply(ParDo.of(new WriteFn(this)));
        }
    }

    public static Read read() {
        return new AutoValue_RabbitMqIO_Read.Builder().setQueueDeclare(false).setExchangeDeclare(false).setMaxReadTime(null).setMaxNumRecords(Long.MAX_VALUE).setUseCorrelationId(false).build();
    }

    public static Write write() {
        return new AutoValue_RabbitMqIO_Write.Builder().setExchangeDeclare(false).setQueueDeclare(false).build();
    }

    private RabbitMqIO() {
    }
}
