/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.mqtt;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.mqtt.AutoValue_MqttIO_ConnectionConfiguration;
import org.apache.beam.sdk.io.mqtt.AutoValue_MqttIO_Read;
import org.apache.beam.sdk.io.mqtt.AutoValue_MqttIO_Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.Pure;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttIO {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(MqttIO.class);
    private static final @UnknownKeyFor @NonNull @Initialized int MQTT_3_1_MAX_CLIENT_ID_LENGTH = 23;

    public static @UnknownKeyFor @NonNull @Initialized Read read() {
        return new AutoValue_MqttIO_Read.Builder().setMaxReadTime(null).setMaxNumRecords(Long.MAX_VALUE).build();
    }

    public static @UnknownKeyFor @NonNull @Initialized Write write() {
        return new AutoValue_MqttIO_Write.Builder().setRetained(false).build();
    }

    private MqttIO() {
    }

    @AutoValue
    public static abstract class Write
    extends PTransform<PCollection<byte[]>, PDone> {
        abstract @Nullable @UnknownKeyFor @Initialized ConnectionConfiguration connectionConfiguration();

        abstract @UnknownKeyFor @NonNull @Initialized boolean retained();

        abstract @UnknownKeyFor @NonNull @Initialized Builder builder();

        public @UnknownKeyFor @NonNull @Initialized Write withConnectionConfiguration(@UnknownKeyFor @NonNull @Initialized ConnectionConfiguration configuration) {
            Preconditions.checkArgument((configuration != null ? 1 : 0) != 0, (Object)"configuration can not be null");
            return this.builder().setConnectionConfiguration(configuration).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Write withRetained(@UnknownKeyFor @NonNull @Initialized boolean retained) {
            return this.builder().setRetained(retained).build();
        }

        public @UnknownKeyFor @NonNull @Initialized PDone expand(@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> input) {
            input.apply((PTransform)ParDo.of((DoFn)new WriteFn(this)));
            return PDone.in((Pipeline)input.getPipeline());
        }

        public void populateDisplayData(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DisplayData.Builder builder) {
            this.connectionConfiguration().populateDisplayData(builder);
            builder.add(DisplayData.item((String)"retained", (Boolean)this.retained()));
        }

        private static class WriteFn
        extends DoFn<byte[], Void> {
            private final @UnknownKeyFor @NonNull @Initialized Write spec;
            private transient @UnknownKeyFor @NonNull @Initialized MQTT client;
            private transient @UnknownKeyFor @NonNull @Initialized BlockingConnection connection;

            public WriteFn(@UnknownKeyFor @NonNull @Initialized Write spec) {
                this.spec = spec;
            }

            @DoFn.Setup
            public void createMqttClient() throws @UnknownKeyFor @NonNull @Initialized Exception {
                LOG.debug("Starting MQTT writer");
                this.client = this.spec.connectionConfiguration().createClient();
                LOG.debug("MQTT writer client ID is {}", (Object)this.client.getClientId());
                this.connection = this.client.blockingConnection();
                this.connection.connect();
            }

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context) throws @UnknownKeyFor @NonNull @Initialized Exception {
                byte[] payload = (byte[])context.element();
                LOG.debug("Sending message {}", (Object)new String(payload, StandardCharsets.UTF_8));
                this.connection.publish(this.spec.connectionConfiguration().getTopic(), payload, QoS.AT_LEAST_ONCE, false);
            }

            @DoFn.Teardown
            public void closeMqttClient() throws @UnknownKeyFor @NonNull @Initialized Exception {
                if (this.connection != null) {
                    LOG.debug("Disconnecting MQTT connection (client ID {})", (Object)this.client.getClientId());
                    this.connection.disconnect();
                }
            }
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract @UnknownKeyFor @NonNull @Initialized Builder setConnectionConfiguration(@UnknownKeyFor @NonNull @Initialized ConnectionConfiguration var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder setRetained(@UnknownKeyFor @NonNull @Initialized boolean var1);

            abstract @UnknownKeyFor @NonNull @Initialized Write build();
        }
    }

    @VisibleForTesting
    static class UnboundedMqttReader
    extends UnboundedSource.UnboundedReader<byte[]> {
        private final @UnknownKeyFor @NonNull @Initialized UnboundedMqttSource source;
        private @UnknownKeyFor @NonNull @Initialized MQTT client;
        private @UnknownKeyFor @NonNull @Initialized BlockingConnection connection;
        private @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] current;
        private @UnknownKeyFor @NonNull @Initialized Instant currentTimestamp;
        private @UnknownKeyFor @NonNull @Initialized MqttCheckpointMark checkpointMark;

        public UnboundedMqttReader(@UnknownKeyFor @NonNull @Initialized UnboundedMqttSource source, @UnknownKeyFor @NonNull @Initialized MqttCheckpointMark checkpointMark) {
            this.source = source;
            this.current = null;
            this.checkpointMark = checkpointMark != null ? checkpointMark : new MqttCheckpointMark();
        }

        public @UnknownKeyFor @NonNull @Initialized boolean start() throws @UnknownKeyFor @NonNull @Initialized IOException {
            LOG.debug("Starting MQTT reader ...");
            Read spec = this.source.spec;
            try {
                this.client = spec.connectionConfiguration().createClient();
                LOG.debug("Reader client ID is {}", (Object)this.client.getClientId());
                this.checkpointMark.clientId = this.client.getClientId().toString();
                this.connection = this.client.blockingConnection();
                this.connection.connect();
                this.connection.subscribe(new Topic[]{new Topic(spec.connectionConfiguration().getTopic(), QoS.AT_LEAST_ONCE)});
                return this.advance();
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }

        public @UnknownKeyFor @NonNull @Initialized boolean advance() throws @UnknownKeyFor @NonNull @Initialized IOException {
            try {
                LOG.trace("MQTT reader (client ID {}) waiting message ...", (Object)this.client.getClientId());
                Message message = this.connection.receive(1L, TimeUnit.SECONDS);
                if (message == null) {
                    return false;
                }
                this.current = message.getPayload();
                this.currentTimestamp = Instant.now();
                this.checkpointMark.add(message, this.currentTimestamp);
            }
            catch (Exception e) {
                throw new IOException(e);
            }
            return true;
        }

        public void close() throws @UnknownKeyFor @NonNull @Initialized IOException {
            LOG.debug("Closing MQTT reader (client ID {})", (Object)this.client.getClientId());
            try {
                if (this.connection != null) {
                    this.connection.disconnect();
                }
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }

        public @UnknownKeyFor @NonNull @Initialized Instant getWatermark() {
            return this.checkpointMark.oldestMessageTimestamp;
        }

        public // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized UnboundedSource.CheckpointMark getCheckpointMark() {
            return this.checkpointMark;
        }

        public @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] getCurrent() {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current;
        }

        public @UnknownKeyFor @NonNull @Initialized Instant getCurrentTimestamp() {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.currentTimestamp;
        }

        public @UnknownKeyFor @NonNull @Initialized UnboundedMqttSource getCurrentSource() {
            return this.source;
        }
    }

    @VisibleForTesting
    static class UnboundedMqttSource
    extends UnboundedSource<byte[], MqttCheckpointMark> {
        private final @UnknownKeyFor @NonNull @Initialized Read spec;

        public UnboundedMqttSource(@UnknownKeyFor @NonNull @Initialized Read spec) {
            this.spec = spec;
        }

        public // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized UnboundedSource.UnboundedReader<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> createReader(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized MqttCheckpointMark checkpointMark) {
            return new UnboundedMqttReader(this, checkpointMark);
        }

        public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized UnboundedMqttSource> split(@UnknownKeyFor @NonNull @Initialized int desiredNumSplits, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
            return Collections.singletonList(new UnboundedMqttSource(this.spec));
        }

        public void populateDisplayData(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DisplayData.Builder builder) {
            this.spec.populateDisplayData(builder);
        }

        public @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized MqttCheckpointMark> getCheckpointMarkCoder() {
            return SerializableCoder.of(MqttCheckpointMark.class);
        }

        public @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> getOutputCoder() {
            return ByteArrayCoder.of();
        }
    }

    @VisibleForTesting
    static class MqttCheckpointMark
    implements UnboundedSource.CheckpointMark,
    Serializable {
        @VisibleForTesting
        @UnknownKeyFor @NonNull @Initialized String clientId;
        @VisibleForTesting
        @UnknownKeyFor @NonNull @Initialized Instant oldestMessageTimestamp = Instant.now();
        @VisibleForTesting
        transient @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Message> messages = new ArrayList<Message>();

        public MqttCheckpointMark() {
        }

        public MqttCheckpointMark(@UnknownKeyFor @NonNull @Initialized String id) {
            this.clientId = id;
        }

        public void add(@UnknownKeyFor @NonNull @Initialized Message message, @UnknownKeyFor @NonNull @Initialized Instant timestamp) {
            if (timestamp.isBefore((ReadableInstant)this.oldestMessageTimestamp)) {
                this.oldestMessageTimestamp = timestamp;
            }
            this.messages.add(message);
        }

        public void finalizeCheckpoint() {
            LOG.debug("Finalizing checkpoint acknowledging pending messages for client ID {}", (Object)this.clientId);
            for (Message message : this.messages) {
                try {
                    message.ack();
                }
                catch (Exception e) {
                    LOG.warn("Can't ack message for client ID {}", (Object)this.clientId, (Object)e);
                }
            }
            this.oldestMessageTimestamp = Instant.now();
            this.messages.clear();
        }

        private void readObject(@UnknownKeyFor @NonNull @Initialized ObjectInputStream stream) throws @UnknownKeyFor @NonNull @Initialized IOException, @UnknownKeyFor @NonNull @Initialized ClassNotFoundException {
            stream.defaultReadObject();
            this.messages = new ArrayList<Message>();
        }

        @EnsuresNonNullIf(expression={"#1"}, result=true)
        @Pure
        public @UnknownKeyFor @NonNull @Initialized boolean equals(@Nullable @UnknownKeyFor @Initialized Object other) {
            if (other instanceof MqttCheckpointMark) {
                MqttCheckpointMark that = (MqttCheckpointMark)other;
                return Objects.equals(this.clientId, that.clientId) && Objects.equals(this.oldestMessageTimestamp, that.oldestMessageTimestamp) && Objects.deepEquals(this.messages, that.messages);
            }
            return false;
        }

        @Pure
        public @UnknownKeyFor @NonNull @Initialized int hashCode() {
            return Objects.hash(this.clientId, this.oldestMessageTimestamp, this.messages);
        }
    }

    @AutoValue
    public static abstract class Read
    extends PTransform<PBegin, PCollection<byte[]>> {
        abstract @Nullable @UnknownKeyFor @Initialized ConnectionConfiguration connectionConfiguration();

        abstract @UnknownKeyFor @NonNull @Initialized long maxNumRecords();

        abstract @Nullable @UnknownKeyFor @Initialized Duration maxReadTime();

        abstract @UnknownKeyFor @NonNull @Initialized Builder builder();

        public @UnknownKeyFor @NonNull @Initialized Read withConnectionConfiguration(@UnknownKeyFor @NonNull @Initialized ConnectionConfiguration configuration) {
            Preconditions.checkArgument((configuration != null ? 1 : 0) != 0, (Object)"configuration can not be null");
            return this.builder().setConnectionConfiguration(configuration).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read withMaxNumRecords(@UnknownKeyFor @NonNull @Initialized long maxNumRecords) {
            return this.builder().setMaxNumRecords(maxNumRecords).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Read withMaxReadTime(@UnknownKeyFor @NonNull @Initialized Duration maxReadTime) {
            return this.builder().setMaxReadTime(maxReadTime).build();
        }

        public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> expand(@UnknownKeyFor @NonNull @Initialized PBegin input) {
            Read.Unbounded unbounded;
            Read.Unbounded transform = unbounded = org.apache.beam.sdk.io.Read.from((UnboundedSource)new UnboundedMqttSource(this));
            if (this.maxNumRecords() < Long.MAX_VALUE || this.maxReadTime() != null) {
                transform = unbounded.withMaxReadTime(this.maxReadTime()).withMaxNumRecords(this.maxNumRecords());
            }
            return (PCollection)input.getPipeline().apply((PTransform)transform);
        }

        public void populateDisplayData(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.connectionConfiguration().populateDisplayData(builder);
            if (this.maxNumRecords() != Long.MAX_VALUE) {
                builder.add(DisplayData.item((String)"maxNumRecords", (Long)this.maxNumRecords()));
            }
            builder.addIfNotNull(DisplayData.item((String)"maxReadTime", (Duration)this.maxReadTime()));
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract @UnknownKeyFor @NonNull @Initialized Builder setConnectionConfiguration(@UnknownKeyFor @NonNull @Initialized ConnectionConfiguration var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder setMaxNumRecords(@UnknownKeyFor @NonNull @Initialized long var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder setMaxReadTime(@UnknownKeyFor @NonNull @Initialized Duration var1);

            abstract @UnknownKeyFor @NonNull @Initialized Read build();
        }
    }

    @AutoValue
    public static abstract class ConnectionConfiguration
    implements Serializable {
        abstract @UnknownKeyFor @NonNull @Initialized String getServerUri();

        abstract @UnknownKeyFor @NonNull @Initialized String getTopic();

        abstract @Nullable @UnknownKeyFor @Initialized String getClientId();

        abstract @Nullable @UnknownKeyFor @Initialized String getUsername();

        abstract @Nullable @UnknownKeyFor @Initialized String getPassword();

        abstract @UnknownKeyFor @NonNull @Initialized Builder builder();

        public static @UnknownKeyFor @NonNull @Initialized ConnectionConfiguration create(@UnknownKeyFor @NonNull @Initialized String serverUri, @UnknownKeyFor @NonNull @Initialized String topic) {
            Preconditions.checkArgument((serverUri != null ? 1 : 0) != 0, (Object)"serverUri can not be null");
            Preconditions.checkArgument((topic != null ? 1 : 0) != 0, (Object)"topic can not be null");
            return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(serverUri).setTopic(topic).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ConnectionConfiguration withServerUri(@UnknownKeyFor @NonNull @Initialized String serverUri) {
            Preconditions.checkArgument((serverUri != null ? 1 : 0) != 0, (Object)"serverUri can not be null");
            return this.builder().setServerUri(serverUri).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ConnectionConfiguration withTopic(@UnknownKeyFor @NonNull @Initialized String topic) {
            Preconditions.checkArgument((topic != null ? 1 : 0) != 0, (Object)"topic can not be null");
            return this.builder().setTopic(topic).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ConnectionConfiguration withClientId(@UnknownKeyFor @NonNull @Initialized String clientId) {
            Preconditions.checkArgument((clientId != null ? 1 : 0) != 0, (Object)"clientId can not be null");
            return this.builder().setClientId(clientId).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ConnectionConfiguration withUsername(@UnknownKeyFor @NonNull @Initialized String username) {
            Preconditions.checkArgument((username != null ? 1 : 0) != 0, (Object)"username can not be null");
            return this.builder().setUsername(username).build();
        }

        public @UnknownKeyFor @NonNull @Initialized ConnectionConfiguration withPassword(@UnknownKeyFor @NonNull @Initialized String password) {
            Preconditions.checkArgument((password != null ? 1 : 0) != 0, (Object)"password can not be null");
            return this.builder().setPassword(password).build();
        }

        private void populateDisplayData(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DisplayData.Builder builder) {
            builder.add(DisplayData.item((String)"serverUri", (String)this.getServerUri()));
            builder.add(DisplayData.item((String)"topic", (String)this.getTopic()));
            builder.addIfNotNull(DisplayData.item((String)"clientId", (String)this.getClientId()));
            builder.addIfNotNull(DisplayData.item((String)"username", (String)this.getUsername()));
        }

        private @UnknownKeyFor @NonNull @Initialized MQTT createClient() throws @UnknownKeyFor @NonNull @Initialized Exception {
            LOG.debug("Creating MQTT client to {}", (Object)this.getServerUri());
            MQTT client = new MQTT();
            client.setHost(this.getServerUri());
            if (this.getUsername() != null) {
                LOG.debug("MQTT client uses username {}", (Object)this.getUsername());
                client.setUserName(this.getUsername());
                client.setPassword(this.getPassword());
            }
            if (this.getClientId() != null) {
                String clientId = this.getClientId() + "-" + UUID.randomUUID().toString();
                clientId = clientId.substring(0, Math.min(clientId.length(), 23));
                LOG.debug("MQTT client id set to {}", (Object)clientId);
                client.setClientId(clientId);
            } else {
                String clientId = UUID.randomUUID().toString();
                clientId = clientId.substring(0, Math.min(clientId.length(), 23));
                LOG.debug("MQTT client id set to random value {}", (Object)clientId);
                client.setClientId(clientId);
            }
            return client;
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract @UnknownKeyFor @NonNull @Initialized Builder setServerUri(@UnknownKeyFor @NonNull @Initialized String var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder setTopic(@UnknownKeyFor @NonNull @Initialized String var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder setClientId(@UnknownKeyFor @NonNull @Initialized String var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder setUsername(@UnknownKeyFor @NonNull @Initialized String var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder setPassword(@UnknownKeyFor @NonNull @Initialized String var1);

            abstract @UnknownKeyFor @NonNull @Initialized ConnectionConfiguration build();
        }
    }
}

