/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.mqtt;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_sdks_java_io_mqtt.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_sdks_java_io_mqtt.com.google.common.base.Preconditions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.mqtt.AutoValue_MqttIO_ConnectionConfiguration;
import org.apache.beam.sdk.io.mqtt.AutoValue_MqttIO_Read;
import org.apache.beam.sdk.io.mqtt.AutoValue_MqttIO_Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class MqttIO {
    private static final Logger LOG = LoggerFactory.getLogger(MqttIO.class);

    public static Read read() {
        return new AutoValue_MqttIO_Read.Builder().setMaxReadTime(null).setMaxNumRecords(Long.MAX_VALUE).build();
    }

    public static Write write() {
        return new AutoValue_MqttIO_Write.Builder().setRetained(false).build();
    }

    private MqttIO() {
    }

    @AutoValue
    public static abstract class Write
    extends PTransform<PCollection<byte[]>, PDone> {
        @Nullable
        abstract ConnectionConfiguration connectionConfiguration();

        abstract boolean retained();

        abstract Builder builder();

        public Write withConnectionConfiguration(ConnectionConfiguration configuration) {
            Preconditions.checkArgument(configuration != null, "configuration can not be null");
            return this.builder().setConnectionConfiguration(configuration).build();
        }

        public Write withRetained(boolean retained) {
            return this.builder().setRetained(retained).build();
        }

        public PDone expand(PCollection<byte[]> input) {
            input.apply((PTransform)ParDo.of((DoFn)new WriteFn(this)));
            return PDone.in((Pipeline)input.getPipeline());
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            this.connectionConfiguration().populateDisplayData(builder);
            builder.add(DisplayData.item((String)"retained", (Boolean)this.retained()));
        }

        private static class WriteFn
        extends DoFn<byte[], Void> {
            private final Write spec;
            private transient MQTT client;
            private transient BlockingConnection connection;

            public WriteFn(Write spec) {
                this.spec = spec;
            }

            @DoFn.Setup
            public void createMqttClient() throws Exception {
                LOG.debug("Starting MQTT writer");
                this.client = this.spec.connectionConfiguration().createClient();
                LOG.debug("MQTT writer client ID is {}", (Object)this.client.getClientId());
                this.connection = this.client.blockingConnection();
                this.connection.connect();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) throws Exception {
                byte[] payload = (byte[])context.element();
                LOG.debug("Sending message {}", (Object)new String(payload, StandardCharsets.UTF_8));
                this.connection.publish(this.spec.connectionConfiguration().getTopic(), payload, QoS.AT_LEAST_ONCE, false);
            }

            @DoFn.Teardown
            public void closeMqttClient() throws Exception {
                if (this.connection != null) {
                    LOG.debug("Disconnecting MQTT connection (client ID {})", (Object)this.client.getClientId());
                    this.connection.disconnect();
                }
            }
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setConnectionConfiguration(ConnectionConfiguration var1);

            abstract Builder setRetained(boolean var1);

            abstract Write build();
        }
    }

    @VisibleForTesting
    static class UnboundedMqttReader
    extends UnboundedSource.UnboundedReader<byte[]> {
        private final UnboundedMqttSource source;
        private MQTT client;
        private BlockingConnection connection;
        private byte[] current;
        private Instant currentTimestamp;
        private MqttCheckpointMark checkpointMark;

        public UnboundedMqttReader(UnboundedMqttSource source, MqttCheckpointMark checkpointMark) {
            this.source = source;
            this.current = null;
            this.checkpointMark = checkpointMark != null ? checkpointMark : new MqttCheckpointMark();
        }

        public boolean start() throws IOException {
            LOG.debug("Starting MQTT reader ...");
            Read spec = this.source.spec;
            try {
                this.client = spec.connectionConfiguration().createClient();
                LOG.debug("Reader client ID is {}", (Object)this.client.getClientId());
                this.checkpointMark.clientId = this.client.getClientId().toString();
                this.connection = this.client.blockingConnection();
                this.connection.connect();
                this.connection.subscribe(new Topic[]{new Topic(spec.connectionConfiguration().getTopic(), QoS.AT_LEAST_ONCE)});
                return this.advance();
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }

        public boolean advance() throws IOException {
            try {
                LOG.trace("MQTT reader (client ID {}) waiting message ...", (Object)this.client.getClientId());
                Message message = this.connection.receive(1L, TimeUnit.SECONDS);
                if (message == null) {
                    return false;
                }
                this.current = message.getPayload();
                this.currentTimestamp = Instant.now();
                this.checkpointMark.add(message, this.currentTimestamp);
            }
            catch (Exception e) {
                throw new IOException(e);
            }
            return true;
        }

        public void close() throws IOException {
            LOG.debug("Closing MQTT reader (client ID {})", (Object)this.client.getClientId());
            try {
                if (this.connection != null) {
                    this.connection.disconnect();
                }
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }

        public Instant getWatermark() {
            return this.checkpointMark.oldestMessageTimestamp;
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            return this.checkpointMark;
        }

        public byte[] getCurrent() {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current;
        }

        public Instant getCurrentTimestamp() {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.currentTimestamp;
        }

        public UnboundedMqttSource getCurrentSource() {
            return this.source;
        }
    }

    @VisibleForTesting
    static class UnboundedMqttSource
    extends UnboundedSource<byte[], MqttCheckpointMark> {
        private final Read spec;

        public UnboundedMqttSource(Read spec) {
            this.spec = spec;
        }

        public UnboundedSource.UnboundedReader<byte[]> createReader(PipelineOptions options, MqttCheckpointMark checkpointMark) {
            return new UnboundedMqttReader(this, checkpointMark);
        }

        public List<UnboundedMqttSource> split(int desiredNumSplits, PipelineOptions options) {
            return Collections.singletonList(new UnboundedMqttSource(this.spec));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            this.spec.populateDisplayData(builder);
        }

        public Coder<MqttCheckpointMark> getCheckpointMarkCoder() {
            return SerializableCoder.of(MqttCheckpointMark.class);
        }

        public Coder<byte[]> getOutputCoder() {
            return ByteArrayCoder.of();
        }
    }

    @VisibleForTesting
    static class MqttCheckpointMark
    implements UnboundedSource.CheckpointMark,
    Serializable {
        private String clientId;
        private Instant oldestMessageTimestamp = Instant.now();
        private transient List<Message> messages = new ArrayList<Message>();

        public void add(Message message, Instant timestamp) {
            if (timestamp.isBefore((ReadableInstant)this.oldestMessageTimestamp)) {
                this.oldestMessageTimestamp = timestamp;
            }
            this.messages.add(message);
        }

        public void finalizeCheckpoint() {
            LOG.debug("Finalizing checkpoint acknowledging pending messages for client ID {}", (Object)this.clientId);
            for (Message message : this.messages) {
                try {
                    message.ack();
                }
                catch (Exception e) {
                    LOG.warn("Can't ack message for client ID {}", (Object)this.clientId, (Object)e);
                }
            }
            this.oldestMessageTimestamp = Instant.now();
            this.messages.clear();
        }

        private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
            this.messages = new ArrayList<Message>();
        }
    }

    @AutoValue
    public static abstract class Read
    extends PTransform<PBegin, PCollection<byte[]>> {
        @Nullable
        abstract ConnectionConfiguration connectionConfiguration();

        abstract long maxNumRecords();

        @Nullable
        abstract Duration maxReadTime();

        abstract Builder builder();

        public Read withConnectionConfiguration(ConnectionConfiguration configuration) {
            Preconditions.checkArgument(configuration != null, "configuration can not be null");
            return this.builder().setConnectionConfiguration(configuration).build();
        }

        public Read withMaxNumRecords(long maxNumRecords) {
            return this.builder().setMaxNumRecords(maxNumRecords).build();
        }

        public Read withMaxReadTime(Duration maxReadTime) {
            return this.builder().setMaxReadTime(maxReadTime).build();
        }

        public PCollection<byte[]> expand(PBegin input) {
            Read.Unbounded unbounded;
            Read.Unbounded transform = unbounded = org.apache.beam.sdk.io.Read.from((UnboundedSource)new UnboundedMqttSource(this));
            if (this.maxNumRecords() < Long.MAX_VALUE || this.maxReadTime() != null) {
                transform = unbounded.withMaxReadTime(this.maxReadTime()).withMaxNumRecords(this.maxNumRecords());
            }
            return (PCollection)input.getPipeline().apply((PTransform)transform);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.connectionConfiguration().populateDisplayData(builder);
            if (this.maxNumRecords() != Long.MAX_VALUE) {
                builder.add(DisplayData.item((String)"maxNumRecords", (Long)this.maxNumRecords()));
            }
            builder.addIfNotNull(DisplayData.item((String)"maxReadTime", (Duration)this.maxReadTime()));
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setConnectionConfiguration(ConnectionConfiguration var1);

            abstract Builder setMaxNumRecords(long var1);

            abstract Builder setMaxReadTime(Duration var1);

            abstract Read build();
        }
    }

    @AutoValue
    public static abstract class ConnectionConfiguration
    implements Serializable {
        @Nullable
        abstract String getServerUri();

        @Nullable
        abstract String getTopic();

        @Nullable
        abstract String getClientId();

        @Nullable
        abstract String getUsername();

        @Nullable
        abstract String getPassword();

        abstract Builder builder();

        public static ConnectionConfiguration create(String serverUri, String topic) {
            Preconditions.checkArgument(serverUri != null, "serverUri can not be null");
            Preconditions.checkArgument(topic != null, "topic can not be null");
            return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(serverUri).setTopic(topic).build();
        }

        public static ConnectionConfiguration create(String serverUri, String topic, String clientId) {
            Preconditions.checkArgument(serverUri != null, "serverUri can not be null");
            Preconditions.checkArgument(topic != null, "topic can not be null");
            Preconditions.checkArgument(clientId != null, "clientId can not be null");
            return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(serverUri).setTopic(topic).setClientId(clientId).build();
        }

        public ConnectionConfiguration withUsername(String username) {
            return this.builder().setUsername(username).build();
        }

        public ConnectionConfiguration withPassword(String password) {
            return this.builder().setPassword(password).build();
        }

        private void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item((String)"serverUri", (String)this.getServerUri()));
            builder.add(DisplayData.item((String)"topic", (String)this.getTopic()));
            builder.addIfNotNull(DisplayData.item((String)"clientId", (String)this.getClientId()));
            builder.addIfNotNull(DisplayData.item((String)"username", (String)this.getUsername()));
        }

        private MQTT createClient() throws Exception {
            LOG.debug("Creating MQTT client to {}", (Object)this.getServerUri());
            MQTT client = new MQTT();
            client.setHost(this.getServerUri());
            if (this.getUsername() != null) {
                LOG.debug("MQTT client uses username {}", (Object)this.getUsername());
                client.setUserName(this.getUsername());
                client.setPassword(this.getPassword());
            }
            if (this.getClientId() != null) {
                String clientId = this.getClientId() + "-" + UUID.randomUUID().toString();
                LOG.debug("MQTT client id set to {}", (Object)clientId);
                client.setClientId(clientId);
            } else {
                String clientId = UUID.randomUUID().toString();
                LOG.debug("MQTT client id set to random value {}", (Object)clientId);
                client.setClientId(clientId);
            }
            return client;
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setServerUri(String var1);

            abstract Builder setTopic(String var1);

            abstract Builder setClientId(String var1);

            abstract Builder setUsername(String var1);

            abstract Builder setPassword(String var1);

            abstract ConnectionConfiguration build();
        }
    }
}

