/*
 * Decompiled with CFR 0.152.
 */
package com.github.davidmoten.rx2.aws;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.github.davidmoten.guavamini.Preconditions;
import com.github.davidmoten.rx2.aws.SqsMessage;
import com.github.davidmoten.rx2.aws.Util;
import io.reactivex.Emitter;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.functions.BiConsumer;
import io.reactivex.schedulers.Schedulers;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;

public final class Sqs {
    private static final Consumer<String> DO_NOTHING = new Consumer<String>(){

        @Override
        public void accept(String t) {
        }
    };

    private Sqs() {
    }

    public static String sendToQueueUsingS3(AmazonSQS sqs, String queueUrl, AmazonS3 s3, String bucketName, Map<String, String> headers, byte[] message, Callable<String> s3IdFactory) {
        String s3Id;
        Preconditions.checkNotNull((Object)sqs);
        Preconditions.checkNotNull((Object)s3);
        Preconditions.checkNotNull((Object)queueUrl);
        Preconditions.checkNotNull((Object)bucketName);
        Preconditions.checkNotNull((Object)message);
        try {
            s3Id = s3IdFactory.call();
        }
        catch (Exception e1) {
            throw new RuntimeException(e1);
        }
        ObjectMetadata metadata = new ObjectMetadata();
        metadata.setContentLength((long)message.length);
        for (Map.Entry<String, String> header : headers.entrySet()) {
            metadata.setHeader(header.getKey(), (Object)header.getValue());
        }
        s3.putObject(bucketName, s3Id, (InputStream)new ByteArrayInputStream(message), metadata);
        try {
            sqs.sendMessage(queueUrl, s3Id);
        }
        catch (RuntimeException e) {
            try {
                s3.deleteObject(bucketName, s3Id);
                throw e;
            }
            catch (RuntimeException e2) {
                throw new CompositeException(new Throwable[]{e, e2});
            }
        }
        return s3Id;
    }

    public static String sendToQueueUsingS3(AmazonSQS sqs, String queueUrl, AmazonS3 s3, String bucketName, byte[] message, Callable<String> s3IdFactory) {
        return Sqs.sendToQueueUsingS3(sqs, queueUrl, s3, bucketName, Collections.emptyMap(), message, s3IdFactory);
    }

    public static String sendToQueueUsingS3(AmazonSQS sqs, String queueUrl, AmazonS3 s3, String bucketName, byte[] message) {
        return Sqs.sendToQueueUsingS3(sqs, queueUrl, s3, bucketName, message, () -> UUID.randomUUID().toString().replace("-", ""));
    }

    public static String sendToQueueUsingS3(AmazonSQS sqs, String queueUrl, AmazonS3 s3, String bucketName, Map<String, String> headers, byte[] message) {
        return Sqs.sendToQueueUsingS3(sqs, queueUrl, s3, bucketName, headers, message, () -> UUID.randomUUID().toString().replace("-", ""));
    }

    public static SqsBuilder queueName(String queueName) {
        return new SqsBuilder(queueName);
    }

    static Flowable<SqsMessage> messages(Callable<AmazonSQS> sqsFactory, Optional<Callable<AmazonS3>> s3Factory, String queueName, Optional<String> bucketName, Optional<Flowable<Integer>> waitTimesSeconds, Consumer<? super String> logger) {
        Preconditions.checkNotNull(sqsFactory);
        Preconditions.checkNotNull(s3Factory);
        Preconditions.checkNotNull((Object)queueName);
        Preconditions.checkNotNull(bucketName);
        Preconditions.checkNotNull(waitTimesSeconds);
        Preconditions.checkNotNull(logger);
        return Flowable.using(sqsFactory, sqs -> Sqs.createFlowableWithSqs(sqs, s3Factory, sqsFactory, queueName, bucketName, waitTimesSeconds, logger), sqs -> sqs.shutdown());
    }

    private static Flowable<SqsMessage> createFlowableWithSqs(AmazonSQS sqs, Optional<Callable<AmazonS3>> s3Factory, Callable<AmazonSQS> sqsFactory, String queueName, Optional<String> bucketName, Optional<Flowable<Integer>> waitTimesSeconds, Consumer<? super String> logger) {
        return Flowable.using(() -> s3Factory.map(x -> {
            try {
                return (AmazonS3)x.call();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }), s3 -> Sqs.createFlowableWithS3(sqs, s3Factory, sqsFactory, queueName, bucketName, s3, waitTimesSeconds, logger), s3 -> s3.ifPresent(Util::shutdown));
    }

    private static Flowable<SqsMessage> createFlowableWithS3(AmazonSQS sqs, Optional<Callable<AmazonS3>> s3Factory, Callable<AmazonSQS> sqsFactory, String queueName, Optional<String> bucketName, Optional<AmazonS3> s3, Optional<Flowable<Integer>> waitTimesSeconds, Consumer<? super String> logger) {
        SqsMessage.Service service = new SqsMessage.Service(s3Factory, sqsFactory, s3, sqs, queueName, bucketName);
        if (waitTimesSeconds.isPresent()) {
            return Sqs.createFlowablePolling(sqs, s3Factory, sqsFactory, queueName, bucketName, s3, waitTimesSeconds.get(), logger);
        }
        return Sqs.createFlowableContinousLongPolling(sqs, queueName, bucketName, s3, service, logger);
    }

    private static Flowable<SqsMessage> createFlowablePolling(AmazonSQS sqs, Optional<Callable<AmazonS3>> s3Factory, Callable<AmazonSQS> sqsFactory, String queueName, Optional<String> bucketName, Optional<AmazonS3> s3, Flowable<Integer> waitTimesSeconds, Consumer<? super String> logger) {
        SqsMessage.Service service = new SqsMessage.Service(s3Factory, sqsFactory, s3, sqs, queueName, bucketName);
        return waitTimesSeconds.flatMap(n -> Sqs.get(sqs, queueName, bucketName, s3, service, n, logger), 1);
    }

    private static Flowable<SqsMessage> get(AmazonSQS sqs, String queueName, Optional<String> bucketName, Optional<AmazonS3> s3, SqsMessage.Service service, int waitTimeSeconds, Consumer<? super String> logger) {
        return Flowable.defer(() -> {
            String queueUrl = sqs.getQueueUrl(queueName).getQueueUrl();
            return Flowable.just(sqs.receiveMessage(Sqs.request(queueName, waitTimeSeconds)).getMessages().stream().map(m -> Sqs.getNextMessage(m, queueUrl, bucketName, s3, sqs, service)).collect(Collectors.toList())).concatWith((Publisher)Flowable.defer(() -> Flowable.just(sqs.receiveMessage(Sqs.request(queueName, 0)).getMessages().stream().map(m -> Sqs.getNextMessage(m, queueUrl, bucketName, s3, sqs, service)).collect(Collectors.toList()))).repeat()).takeWhile(list -> !list.isEmpty()).flatMapIterable(x -> x).filter(opt -> opt.isPresent()).map(opt -> (SqsMessage)opt.get());
        });
    }

    private static Flowable<SqsMessage> createFlowableContinousLongPolling(AmazonSQS sqs, String queueName, Optional<String> bucketName, Optional<AmazonS3> s3, SqsMessage.Service service, Consumer<? super String> logger) {
        ContinuousLongPollingSyncOnSubscribe c = new ContinuousLongPollingSyncOnSubscribe(sqs, queueName, s3, bucketName, service, logger);
        return Flowable.generate((Callable)c, (BiConsumer)c);
    }

    static Optional<SqsMessage> getNextMessage(Message message, String queueUrl, Optional<String> bucketName, Optional<AmazonS3> s3, AmazonSQS sqs, SqsMessage.Service service) {
        if (bucketName.isPresent()) {
            String s3Id = message.getBody();
            if (!s3.get().doesObjectExist(bucketName.get(), s3Id)) {
                sqs.deleteMessage(queueUrl, message.getReceiptHandle());
                return Optional.empty();
            }
            S3Object object = s3.get().getObject(bucketName.get(), s3Id);
            byte[] content = Sqs.readAndClose((InputStream)object.getObjectContent());
            long timestamp = object.getObjectMetadata().getLastModified().getTime();
            SqsMessage mb = new SqsMessage(message.getReceiptHandle(), content, timestamp, Optional.of(s3Id), service);
            return Optional.of(mb);
        }
        SqsMessage mb = new SqsMessage(message.getReceiptHandle(), message.getBody().getBytes(StandardCharsets.UTF_8), System.currentTimeMillis(), Optional.empty(), service);
        return Optional.of(mb);
    }

    private static ReceiveMessageRequest request(String queueUrl, int waitTimeSeconds) {
        return new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(Integer.valueOf(20)).withWaitTimeSeconds(Integer.valueOf(waitTimeSeconds));
    }

    static byte[] readAndClose(InputStream is) {
        Preconditions.checkNotNull((Object)is);
        try {
            int n;
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            byte[] bytes = new byte[8192];
            while ((n = is.read(bytes)) != -1) {
                bos.write(bytes, 0, n);
            }
            return bos.toByteArray();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    static /* synthetic */ Consumer access$000() {
        return DO_NOTHING;
    }

    private static final class State {
        final Queue<Message> queue;

        public State(Queue<Message> queue) {
            this.queue = queue;
        }
    }

    private static final class ContinuousLongPollingSyncOnSubscribe
    implements Callable<State>,
    BiConsumer<State, Emitter<SqsMessage>> {
        private final AmazonSQS sqs;
        private final String queueName;
        private final Optional<AmazonS3> s3;
        private final Optional<String> bucketName;
        private final SqsMessage.Service service;
        private ReceiveMessageRequest request;
        private String queueUrl;
        private Consumer<? super String> logger;

        public ContinuousLongPollingSyncOnSubscribe(AmazonSQS sqs, String queueName, Optional<AmazonS3> s3, Optional<String> bucketName, SqsMessage.Service service, Consumer<? super String> logger) {
            this.sqs = sqs;
            this.queueName = queueName;
            this.s3 = s3;
            this.bucketName = bucketName;
            this.service = service;
            this.logger = logger;
        }

        @Override
        public State call() {
            this.queueUrl = this.sqs.getQueueUrl(this.queueName).getQueueUrl();
            this.request = new ReceiveMessageRequest(this.queueUrl).withWaitTimeSeconds(Integer.valueOf(20)).withMaxNumberOfMessages(Integer.valueOf(10));
            return new State(new LinkedList<Message>());
        }

        public void accept(State state, Emitter<SqsMessage> emitter) throws Exception {
            Queue<Message> q = state.queue;
            Optional<Object> next = Optional.empty();
            while (!next.isPresent()) {
                while (q.isEmpty()) {
                    this.logger.accept("long polling for messages on queueName=" + this.queueName);
                    ReceiveMessageResult result = this.sqs.receiveMessage(this.request);
                    q.addAll(result.getMessages());
                }
                Message message = q.poll();
                next = Sqs.getNextMessage(message, this.queueUrl, this.bucketName, this.s3, this.sqs, this.service);
            }
            emitter.onNext(next.get());
        }
    }

    public static final class ViaS3Builder {
        private final SqsBuilder sqsBuilder;

        public ViaS3Builder(SqsBuilder sqsBuilder) {
            this.sqsBuilder = sqsBuilder;
        }

        public SqsBuilder s3Factory(Callable<AmazonS3> s3Factory) {
            this.sqsBuilder.s3 = Optional.of(s3Factory);
            return this.sqsBuilder;
        }
    }

    public static final class SqsBuilder {
        private final String queueName;
        private Callable<AmazonSQS> sqs = null;
        private Optional<Callable<AmazonS3>> s3 = Optional.empty();
        private Optional<String> bucketName = Optional.empty();
        private Optional<Flowable<Integer>> waitTimesSeconds = Optional.empty();
        private Consumer<? super String> logger = Sqs.access$000();

        SqsBuilder(String queueName) {
            Preconditions.checkNotNull((Object)queueName);
            this.queueName = queueName;
        }

        public ViaS3Builder bucketName(String bucketName) {
            this.bucketName = Optional.of(bucketName);
            return new ViaS3Builder(this);
        }

        public SqsBuilder sqsFactory(Callable<AmazonSQS> sqsFactory) {
            this.sqs = sqsFactory;
            return this;
        }

        public SqsBuilder waitTimes(Flowable<? extends Number> waitTimesSeconds, TimeUnit unit) {
            this.waitTimesSeconds = Optional.of(waitTimesSeconds.map(x -> (int)unit.toSeconds(Math.round(x.doubleValue()))));
            return this;
        }

        public SqsBuilder interval(int interval, TimeUnit unit, Scheduler scheduler) {
            return this.waitTimes((Flowable<? extends Number>)Flowable.just((Object)0).concatWith((Publisher)Flowable.interval((long)interval, (TimeUnit)unit, (Scheduler)scheduler).map(x -> 0)), TimeUnit.SECONDS);
        }

        public SqsBuilder interval(int interval, TimeUnit unit) {
            return this.interval(interval, unit, Schedulers.io());
        }

        public SqsBuilder logger(Consumer<? super String> logger) {
            this.logger = logger;
            return this;
        }

        public Flowable<SqsMessage> messages() {
            return Sqs.messages(this.sqs, this.s3, this.queueName, this.bucketName, this.waitTimesSeconds, this.logger);
        }
    }
}

