package org.apache.beam.sdk.io.amqp;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_sdks_java_io_amqp.com.google.common.base.Joiner;
import org.apache.beam.repackaged.beam_sdks_java_io_amqp.com.google.common.base.Preconditions;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.amqp.AutoValue_AmqpIO_Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.messenger.Messenger;
import org.apache.qpid.proton.messenger.Tracker;
import org.joda.time.Duration;
import org.joda.time.Instant;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/amqp/AmqpIO.class */
public class AmqpIO {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/amqp/AmqpIO$AmqpCheckpointMark.class */
    public static class AmqpCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {
        private transient Messenger messenger;
        private transient List<Tracker> trackers = new ArrayList();

        public void finalizeCheckpoint() {
            Iterator<Tracker> it = this.trackers.iterator();
            while (it.hasNext()) {
                this.messenger.accept(it.next(), 0);
            }
            this.trackers.clear();
        }

        private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
            this.trackers = new ArrayList();
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/amqp/AmqpIO$Read.class */
    public static abstract class Read extends PTransform<PBegin, PCollection<Message>> {

        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/amqp/AmqpIO$Read$Builder.class */
        static abstract class Builder {
            abstract Builder setAddresses(List<String> list);

            abstract Builder setMaxNumRecords(long j);

            abstract Builder setMaxReadTime(Duration duration);

            abstract Read build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract List<String> addresses();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long maxNumRecords();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Duration maxReadTime();

        abstract Builder builder();

        public Read withAddresses(List<String> list) {
            Preconditions.checkArgument(list != null, "addresses can not be null");
            Preconditions.checkArgument(!list.isEmpty(), "addresses can not be empty");
            return builder().setAddresses(list).build();
        }

        public Read withMaxNumRecords(long j) {
            return builder().setMaxNumRecords(j).build();
        }

        public Read withMaxReadTime(Duration duration) {
            return builder().setMaxReadTime(duration).build();
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item("addresses", Joiner.on(" ").join(addresses())));
        }

        public PCollection<Message> expand(PBegin pBegin) {
            Preconditions.checkArgument(addresses() != null, "withAddresses() is required");
            PTransform from = org.apache.beam.sdk.io.Read.from(new UnboundedAmqpSource(this));
            PTransform pTransform = from;
            if (maxNumRecords() < Long.MAX_VALUE || maxReadTime() != null) {
                pTransform = from.withMaxReadTime(maxReadTime()).withMaxNumRecords(maxNumRecords());
            }
            return pBegin.getPipeline().apply(pTransform);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/amqp/AmqpIO$UnboundedAmqpReader.class */
    public static class UnboundedAmqpReader extends UnboundedSource.UnboundedReader<Message> {
        private final UnboundedAmqpSource source;
        private Messenger messenger;
        private Instant currentTimestamp;
        private AmqpCheckpointMark checkpointMark;
        private Instant watermark = new Instant(Long.MIN_VALUE);
        private Message current = null;

        public UnboundedAmqpReader(UnboundedAmqpSource unboundedAmqpSource, AmqpCheckpointMark amqpCheckpointMark) {
            this.source = unboundedAmqpSource;
            if (amqpCheckpointMark != null) {
                this.checkpointMark = amqpCheckpointMark;
            } else {
                this.checkpointMark = new AmqpCheckpointMark();
            }
        }

        public Instant getWatermark() {
            return this.watermark;
        }

        public Instant getCurrentTimestamp() {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.currentTimestamp;
        }

        /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
        public Message m245getCurrent() {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current;
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            return this.checkpointMark;
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public UnboundedAmqpSource m244getCurrentSource() {
            return this.source;
        }

        public boolean start() throws IOException {
            Read read = this.source.spec;
            this.messenger = Messenger.Factory.create();
            this.messenger.start();
            Iterator<String> it = read.addresses().iterator();
            while (it.hasNext()) {
                this.messenger.subscribe(it.next());
            }
            this.checkpointMark.messenger = this.messenger;
            return advance();
        }

        public boolean advance() {
            this.messenger.recv();
            if (this.messenger.incoming() <= 0) {
                this.current = null;
                return false;
            }
            Message message = this.messenger.get();
            this.checkpointMark.trackers.add(this.messenger.incomingTracker());
            this.currentTimestamp = new Instant(message.getCreationTime());
            this.watermark = this.currentTimestamp;
            this.current = message;
            return true;
        }

        public void close() {
            if (this.messenger != null) {
                this.messenger.stop();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/amqp/AmqpIO$UnboundedAmqpSource.class */
    public static class UnboundedAmqpSource extends UnboundedSource<Message, AmqpCheckpointMark> {
        private final Read spec;

        public UnboundedAmqpSource(Read read) {
            this.spec = read;
        }

        public List<UnboundedAmqpSource> split(int i, PipelineOptions pipelineOptions) {
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < Math.max(1, i); i2++) {
                arrayList.add(new UnboundedAmqpSource(this.spec));
            }
            return arrayList;
        }

        public UnboundedSource.UnboundedReader<Message> createReader(PipelineOptions pipelineOptions, AmqpCheckpointMark amqpCheckpointMark) {
            return new UnboundedAmqpReader(this, amqpCheckpointMark);
        }

        public Coder<Message> getOutputCoder() {
            return new AmqpMessageCoder();
        }

        public Coder<AmqpCheckpointMark> getCheckpointMarkCoder() {
            return SerializableCoder.of(AmqpCheckpointMark.class);
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/amqp/AmqpIO$Write.class */
    public static abstract class Write extends PTransform<PCollection<Message>, PDone> {

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/amqp/AmqpIO$Write$WriteFn.class */
        public static class WriteFn extends DoFn<Message, Void> {
            private final Write spec;
            private transient Messenger messenger;

            public WriteFn(Write write) {
                this.spec = write;
            }

            @DoFn.Setup
            public void setup() throws Exception {
                this.messenger = Messenger.Factory.create();
                this.messenger.start();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<Message, Void>.ProcessContext processContext) throws Exception {
                this.messenger.put((Message) processContext.element());
                this.messenger.send();
            }

            @DoFn.Teardown
            public void teardown() throws Exception {
                if (this.messenger != null) {
                    this.messenger.stop();
                }
            }
        }

        public PDone expand(PCollection<Message> pCollection) {
            pCollection.apply(ParDo.of(new WriteFn(this)));
            return PDone.in(pCollection.getPipeline());
        }
    }

    public static Read read() {
        return new AutoValue_AmqpIO_Read.Builder().setMaxNumRecords(Long.MAX_VALUE).build();
    }

    public static Write write() {
        return new AutoValue_AmqpIO_Write();
    }

    private AmqpIO() {
    }
}
