package org.apache.beam.sdk.io.mqtt;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.mqtt.AutoValue_MqttIO_ConnectionConfiguration;
import org.apache.beam.sdk.io.mqtt.AutoValue_MqttIO_Read;
import org.apache.beam.sdk.io.mqtt.AutoValue_MqttIO_Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdks.java.io.mqtt.repackaged.com.google.common.base.Preconditions;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/mqtt/MqttIO.class */
public class MqttIO {
    private static final Logger LOG = LoggerFactory.getLogger(MqttIO.class);

    /* loaded from: input_file:org/apache/beam/sdk/io/mqtt/MqttIO$ConnectionConfiguration.class */
    public static abstract class ConnectionConfiguration implements Serializable {

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/mqtt/MqttIO$ConnectionConfiguration$Builder.class */
        public static abstract class Builder {
            abstract Builder setServerUri(String str);

            abstract Builder setTopic(String str);

            abstract Builder setClientId(String str);

            abstract Builder setUsername(String str);

            abstract Builder setPassword(String str);

            abstract ConnectionConfiguration build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getServerUri();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getTopic();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getClientId();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getUsername();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getPassword();

        abstract Builder builder();

        public static ConnectionConfiguration create(String str, String str2) {
            Preconditions.checkArgument(str != null, "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null serverUri");
            Preconditions.checkArgument(str2 != null, "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null topic");
            return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(str).setTopic(str2).build();
        }

        public static ConnectionConfiguration create(String str, String str2, String str3) {
            Preconditions.checkArgument(str != null, "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null serverUri");
            Preconditions.checkArgument(str2 != null, "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null topic");
            Preconditions.checkArgument(str3 != null, "MqttIO.ConnectionConfiguration.create(serverUri,topic, clientId) called with null clientId");
            return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(str).setTopic(str2).setClientId(str3).build();
        }

        public ConnectionConfiguration withUsername(String str) {
            return builder().setUsername(str).build();
        }

        public ConnectionConfiguration withPassword(String str) {
            return builder().setPassword(str).build();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item("serverUri", getServerUri()));
            builder.add(DisplayData.item("topic", getTopic()));
            builder.addIfNotNull(DisplayData.item("clientId", getClientId()));
            builder.addIfNotNull(DisplayData.item("username", getUsername()));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public MQTT createClient() throws Exception {
            MqttIO.LOG.debug("Creating MQTT client to {}", getServerUri());
            MQTT mqtt = new MQTT();
            mqtt.setHost(getServerUri());
            if (getUsername() != null) {
                MqttIO.LOG.debug("MQTT client uses username {}", getUsername());
                mqtt.setUserName(getUsername());
                mqtt.setPassword(getPassword());
            }
            if (getClientId() != null) {
                String str = getClientId() + "-" + UUID.randomUUID().toString();
                MqttIO.LOG.debug("MQTT client id set to {}", str);
                mqtt.setClientId(str);
            } else {
                String uuid = UUID.randomUUID().toString();
                MqttIO.LOG.debug("MQTT client id set to random value {}", uuid);
                mqtt.setClientId(uuid);
            }
            return mqtt;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/mqtt/MqttIO$MqttCheckpointMark.class */
    public static class MqttCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {
        private String clientId;
        private Instant oldestMessageTimestamp = Instant.now();
        private transient List<Message> messages = new ArrayList();

        public void add(Message message, Instant instant) {
            if (instant.isBefore(this.oldestMessageTimestamp)) {
                this.oldestMessageTimestamp = instant;
            }
            this.messages.add(message);
        }

        public void finalizeCheckpoint() {
            MqttIO.LOG.debug("Finalizing checkpoint acknowledging pending messages for client ID {}", this.clientId);
            Iterator<Message> it = this.messages.iterator();
            while (it.hasNext()) {
                try {
                    it.next().ack();
                } catch (Exception e) {
                    MqttIO.LOG.warn("Can't ack message for client ID {}", this.clientId, e);
                }
            }
            this.oldestMessageTimestamp = Instant.now();
            this.messages.clear();
        }

        private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
            this.messages = new ArrayList();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/mqtt/MqttIO$Read.class */
    public static abstract class Read extends PTransform<PBegin, PCollection<byte[]>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/mqtt/MqttIO$Read$Builder.class */
        public static abstract class Builder {
            abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);

            abstract Builder setMaxNumRecords(long j);

            abstract Builder setMaxReadTime(Duration duration);

            abstract Read build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract ConnectionConfiguration connectionConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long maxNumRecords();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Duration maxReadTime();

        abstract Builder builder();

        public Read withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
            Preconditions.checkArgument(connectionConfiguration != null, "MqttIO.read().withConnectionConfiguration(configuration) called with null configuration or not called at all");
            return builder().setConnectionConfiguration(connectionConfiguration).build();
        }

        public Read withMaxNumRecords(long j) {
            Preconditions.checkArgument(maxReadTime() == null, "maxNumRecord and maxReadTime are exclusive");
            return builder().setMaxNumRecords(j).build();
        }

        public Read withMaxReadTime(Duration duration) {
            Preconditions.checkArgument(maxNumRecords() == Long.MAX_VALUE, "maxNumRecord and maxReadTime are exclusive");
            return builder().setMaxReadTime(duration).build();
        }

        public PCollection<byte[]> expand(PBegin pBegin) {
            BoundedReadFromUnboundedSource from = org.apache.beam.sdk.io.Read.from(new UnboundedMqttSource(this));
            BoundedReadFromUnboundedSource boundedReadFromUnboundedSource = from;
            if (maxNumRecords() != Long.MAX_VALUE) {
                boundedReadFromUnboundedSource = from.withMaxNumRecords(maxNumRecords());
            } else if (maxReadTime() != null) {
                boundedReadFromUnboundedSource = from.withMaxReadTime(maxReadTime());
            }
            return pBegin.getPipeline().apply(boundedReadFromUnboundedSource);
        }

        public void validate(PipelineOptions pipelineOptions) {
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            connectionConfiguration().populateDisplayData(builder);
            if (maxNumRecords() != Long.MAX_VALUE) {
                builder.add(DisplayData.item("maxNumRecords", Long.valueOf(maxNumRecords())));
            }
            builder.addIfNotNull(DisplayData.item("maxReadTime", maxReadTime()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/mqtt/MqttIO$UnboundedMqttReader.class */
    public static class UnboundedMqttReader extends UnboundedSource.UnboundedReader<byte[]> {
        private final UnboundedMqttSource source;
        private MQTT client;
        private BlockingConnection connection;
        private byte[] current = null;
        private Instant currentTimestamp;
        private MqttCheckpointMark checkpointMark;

        public UnboundedMqttReader(UnboundedMqttSource unboundedMqttSource, MqttCheckpointMark mqttCheckpointMark) {
            this.source = unboundedMqttSource;
            if (mqttCheckpointMark != null) {
                this.checkpointMark = mqttCheckpointMark;
            } else {
                this.checkpointMark = new MqttCheckpointMark();
            }
        }

        public boolean start() throws IOException {
            MqttIO.LOG.debug("Starting MQTT reader ...");
            Read read = this.source.spec;
            try {
                this.client = read.connectionConfiguration().createClient();
                MqttIO.LOG.debug("Reader client ID is {}", this.client.getClientId());
                this.checkpointMark.clientId = this.client.getClientId().toString();
                this.connection = this.client.blockingConnection();
                this.connection.connect();
                this.connection.subscribe(new Topic[]{new Topic(read.connectionConfiguration().getTopic(), QoS.AT_LEAST_ONCE)});
                return advance();
            } catch (Exception e) {
                throw new IOException(e);
            }
        }

        public boolean advance() throws IOException {
            try {
                MqttIO.LOG.debug("MQTT reader (client ID {}) waiting message ...", this.client.getClientId());
                Message receive = this.connection.receive();
                this.current = receive.getPayload();
                this.currentTimestamp = Instant.now();
                this.checkpointMark.add(receive, this.currentTimestamp);
                return true;
            } catch (Exception e) {
                throw new IOException(e);
            }
        }

        public void close() throws IOException {
            MqttIO.LOG.debug("Closing MQTT reader (client ID {})", this.client.getClientId());
            try {
                if (this.connection != null) {
                    this.connection.disconnect();
                }
            } catch (Exception e) {
                throw new IOException(e);
            }
        }

        public Instant getWatermark() {
            return this.checkpointMark.oldestMessageTimestamp;
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            return this.checkpointMark;
        }

        /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
        public byte[] m3getCurrent() {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current;
        }

        public Instant getCurrentTimestamp() {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.currentTimestamp;
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public UnboundedMqttSource m2getCurrentSource() {
            return this.source;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/mqtt/MqttIO$UnboundedMqttSource.class */
    public static class UnboundedMqttSource extends UnboundedSource<byte[], MqttCheckpointMark> {
        private final Read spec;

        public UnboundedMqttSource(Read read) {
            this.spec = read;
        }

        public UnboundedSource.UnboundedReader<byte[]> createReader(PipelineOptions pipelineOptions, MqttCheckpointMark mqttCheckpointMark) {
            return new UnboundedMqttReader(this, mqttCheckpointMark);
        }

        public List<UnboundedMqttSource> split(int i, PipelineOptions pipelineOptions) {
            return Collections.singletonList(new UnboundedMqttSource(this.spec));
        }

        public void validate() {
            this.spec.validate(null);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            this.spec.populateDisplayData(builder);
        }

        public Coder<MqttCheckpointMark> getCheckpointMarkCoder() {
            return SerializableCoder.of(MqttCheckpointMark.class);
        }

        public Coder<byte[]> getDefaultOutputCoder() {
            return ByteArrayCoder.of();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/mqtt/MqttIO$Write.class */
    public static abstract class Write extends PTransform<PCollection<byte[]>, PDone> {

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/mqtt/MqttIO$Write$Builder.class */
        public static abstract class Builder {
            abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration);

            abstract Builder setRetained(boolean z);

            abstract Write build();
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/mqtt/MqttIO$Write$WriteFn.class */
        public static class WriteFn extends DoFn<byte[], Void> {
            private final Write spec;
            private transient MQTT client;
            private transient BlockingConnection connection;

            public WriteFn(Write write) {
                this.spec = write;
            }

            @DoFn.Setup
            public void createMqttClient() throws Exception {
                MqttIO.LOG.debug("Starting MQTT writer");
                this.client = this.spec.connectionConfiguration().createClient();
                MqttIO.LOG.debug("MQTT writer client ID is {}", this.client.getClientId());
                this.connection = this.client.blockingConnection();
                this.connection.connect();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<byte[], Void>.ProcessContext processContext) throws Exception {
                byte[] bArr = (byte[]) processContext.element();
                MqttIO.LOG.debug("Sending message {}", new String(bArr));
                this.connection.publish(this.spec.connectionConfiguration().getTopic(), bArr, QoS.AT_LEAST_ONCE, false);
            }

            @DoFn.Teardown
            public void closeMqttClient() throws Exception {
                if (this.connection != null) {
                    MqttIO.LOG.debug("Disconnecting MQTT connection (client ID {})", this.client.getClientId());
                    this.connection.disconnect();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract ConnectionConfiguration connectionConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract boolean retained();

        abstract Builder builder();

        public Write withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) {
            Preconditions.checkArgument(connectionConfiguration != null, "MqttIO.write().withConnectionConfiguration(configuration) called with null configuration or not called at all");
            return builder().setConnectionConfiguration(connectionConfiguration).build();
        }

        public Write withRetained(boolean z) {
            return builder().setRetained(z).build();
        }

        public PDone expand(PCollection<byte[]> pCollection) {
            pCollection.apply(ParDo.of(new WriteFn(this)));
            return PDone.in(pCollection.getPipeline());
        }

        public void validate(PipelineOptions pipelineOptions) {
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            connectionConfiguration().populateDisplayData(builder);
            builder.add(DisplayData.item("retained", Boolean.valueOf(retained())));
        }
    }

    public static Read read() {
        return new AutoValue_MqttIO_Read.Builder().setMaxReadTime(null).setMaxNumRecords(Long.MAX_VALUE).build();
    }

    public static Write write() {
        return new AutoValue_MqttIO_Write.Builder().setRetained(false).build();
    }

    private MqttIO() {
    }
}
