package org.apache.beam.sdk.io.aws.sqs;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.MessageSystemAttributeName;
import com.amazonaws.services.sqs.model.QueueAttributeName;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.BucketingFunction;
import org.apache.beam.sdk.util.MovingFunction;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.EvictingQueue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader.class */
public class SqsUnboundedReader extends UnboundedSource.UnboundedReader<Message> {
    public static final int MAX_NUMBER_OF_MESSAGES = 10;
    private static final int BATCH_OPERATION_MAX_RETIRES = 5;
    private static final int VISIBILITY_EXTENSION_PCT = 50;
    private static final int VISIBILITY_SAFETY_PCT = 20;
    private static final int DELETE_BATCH_SIZE = 10;
    private static final int MAX_IN_FLIGHT = 20000;
    private static final int MAX_AVG_BYTE_MESSAGES = 20;
    private static final int MIN_WATERMARK_MESSAGES = 10;
    private static final int MIN_WATERMARK_SPREAD = 2;
    private final SqsUnboundedSource source;
    private static final Logger LOG = LoggerFactory.getLogger(SqsUnboundedReader.class);
    private static final Duration PROCESSING_TIMEOUT = Duration.standardMinutes(2);
    private static final Duration VISIBILITY_TOO_LATE = Duration.standardSeconds(2);
    private static final Duration SAMPLE_PERIOD = Duration.standardMinutes(1);
    private static final Duration SAMPLE_UPDATE = Duration.standardSeconds(5);
    private static final Duration LOG_PERIOD = Duration.standardSeconds(30);
    private static final Combine.BinaryCombineLongFn MIN = new Combine.BinaryCombineLongFn() { // from class: org.apache.beam.sdk.io.aws.sqs.SqsUnboundedReader.1
        public long apply(long j, long j2) {
            return Math.min(j, j2);
        }

        public long identity() {
            return Long.MAX_VALUE;
        }
    };
    private static final Combine.BinaryCombineLongFn MAX = new Combine.BinaryCombineLongFn() { // from class: org.apache.beam.sdk.io.aws.sqs.SqsUnboundedReader.2
        public long apply(long j, long j2) {
            return Math.max(j, j2);
        }

        public long identity() {
            return Long.MIN_VALUE;
        }
    };
    private static final Combine.BinaryCombineLongFn SUM = Sum.ofLongs();
    private AtomicBoolean active = new AtomicBoolean(true);
    final Queue<Message> messagesNotYetRead = new ArrayDeque();
    private Set<String> safeToDeleteIds = new HashSet();
    private final LinkedHashMap<String, InFlightState> inFlight = new LinkedHashMap<>();
    private final Queue<List<String>> deletedIds = new ConcurrentLinkedQueue();
    private long visibilityTimeoutMs = -1;
    private long notYetReadBytes = 0;
    private EvictingQueue<Integer> recentMessageBytes = EvictingQueue.create(20);
    private BucketingFunction minUnreadTimestampMsSinceEpoch = new BucketingFunction(SAMPLE_UPDATE.getMillis(), MIN_WATERMARK_SPREAD, 10, MIN);
    private MovingFunction minReadTimestampMsSinceEpoch = newFun(MIN);
    private MovingFunction numEmptyReceives = newFun(SUM);
    private long lastReceivedMsSinceEpoch = -1;
    private long lastWatermarkMsSinceEpoch = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
    private Message current = null;
    private long lastLogTimestampMsSinceEpoch = -1;
    private long numReceived = 0;
    private MovingFunction numReceivedRecently = newFun(SUM);
    private MovingFunction numExtendedDeadlines = newFun(SUM);
    private MovingFunction numLateDeadlines = newFun(SUM);
    private MovingFunction numDeleted = newFun(SUM);
    private MovingFunction numExpired = newFun(SUM);
    private MovingFunction numReleased = newFun(SUM);
    private MovingFunction numReadBytes = newFun(SUM);
    private MovingFunction minReceivedTimestampMsSinceEpoch = newFun(MIN);
    private MovingFunction maxReceivedTimestampMsSinceEpoch = newFun(MAX);
    private MovingFunction minWatermarkMsSinceEpoch = newFun(MIN);
    private MovingFunction maxWatermarkMsSinceEpoch = newFun(MAX);
    private MovingFunction numLateMessages = newFun(SUM);
    AtomicInteger numInFlightCheckpoints = new AtomicInteger();
    private int maxInFlightCheckpoints = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReader$InFlightState.class */
    public static class InFlightState {
        String receiptHandle;
        long requestTimeMsSinceEpoch;
        long visibilityDeadlineMsSinceEpoch;

        public InFlightState(String str, long j, long j2) {
            this.receiptHandle = str;
            this.requestTimeMsSinceEpoch = j;
            this.visibilityDeadlineMsSinceEpoch = j2;
        }
    }

    private static MovingFunction newFun(Combine.BinaryCombineLongFn binaryCombineLongFn) {
        return new MovingFunction(SAMPLE_PERIOD.getMillis(), SAMPLE_UPDATE.getMillis(), MIN_WATERMARK_SPREAD, 10, binaryCombineLongFn);
    }

    public SqsUnboundedReader(SqsUnboundedSource sqsUnboundedSource, SqsCheckpointMark sqsCheckpointMark) throws IOException {
        this.source = sqsUnboundedSource;
        if (sqsCheckpointMark != null) {
            long now = now();
            extendBatch(now, sqsCheckpointMark.notYetReadReceipts, 0);
            this.numReleased.add(now, sqsCheckpointMark.notYetReadReceipts.size());
        }
    }

    public Instant getWatermark() {
        long now = now();
        long j = this.minReadTimestampMsSinceEpoch.get(now);
        long j2 = this.minUnreadTimestampMsSinceEpoch.get();
        if (j == Long.MAX_VALUE && j2 == Long.MAX_VALUE && this.numEmptyReceives.get(now) > 0 && now > this.lastReceivedMsSinceEpoch + SAMPLE_PERIOD.getMillis()) {
            this.lastWatermarkMsSinceEpoch = now;
        } else if (this.minReadTimestampMsSinceEpoch.isSignificant() || this.minUnreadTimestampMsSinceEpoch.isSignificant()) {
            this.lastWatermarkMsSinceEpoch = Math.min(j, j2);
        }
        this.minWatermarkMsSinceEpoch.add(now, this.lastWatermarkMsSinceEpoch);
        this.maxWatermarkMsSinceEpoch.add(now, this.lastWatermarkMsSinceEpoch);
        return new Instant(this.lastWatermarkMsSinceEpoch);
    }

    /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
    public Message m32getCurrent() throws NoSuchElementException {
        if (this.current == null) {
            throw new NoSuchElementException();
        }
        return this.current;
    }

    public Instant getCurrentTimestamp() throws NoSuchElementException {
        if (this.current == null) {
            throw new NoSuchElementException();
        }
        return getTimestamp(this.current);
    }

    public byte[] getCurrentRecordId() throws NoSuchElementException {
        if (this.current == null) {
            throw new NoSuchElementException();
        }
        return this.current.getMessageId().getBytes(StandardCharsets.UTF_8);
    }

    public UnboundedSource.CheckpointMark getCheckpointMark() {
        this.maxInFlightCheckpoints = Math.max(this.maxInFlightCheckpoints, this.numInFlightCheckpoints.incrementAndGet());
        ArrayList newArrayList = Lists.newArrayList(this.safeToDeleteIds);
        ArrayList arrayList = new ArrayList(this.messagesNotYetRead.size());
        Iterator<Message> it = this.messagesNotYetRead.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getReceiptHandle());
        }
        return new SqsCheckpointMark(this, newArrayList, arrayList);
    }

    /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
    public SqsUnboundedSource m31getCurrentSource() {
        return this.source;
    }

    public long getTotalBacklogBytes() {
        long avgMessageBytes = avgMessageBytes();
        long parseLong = Long.parseLong((String) this.source.getSqs().getQueueAttributes(this.source.getRead().queueUrl(), Collections.singletonList(QueueAttributeName.ApproximateNumberOfMessages.toString())).getAttributes().get(QueueAttributeName.ApproximateNumberOfMessages.toString()));
        if (avgMessageBytes != -1 || parseLong <= 0) {
            return parseLong * avgMessageBytes;
        }
        return -1L;
    }

    public boolean start() throws IOException {
        this.visibilityTimeoutMs = Integer.parseInt((String) this.source.getSqs().getQueueAttributes(new GetQueueAttributesRequest(this.source.getRead().queueUrl()).withAttributeNames(new String[]{"VisibilityTimeout"})).getAttributes().get("VisibilityTimeout")) * 1000;
        return advance();
    }

    public boolean advance() throws IOException {
        stats();
        if (this.current != null) {
            this.minUnreadTimestampMsSinceEpoch.remove(getRequestTimeMsSinceEpoch(this.current).longValue());
            this.current = null;
        }
        retire();
        extend();
        if (this.messagesNotYetRead.isEmpty()) {
            pull();
        }
        this.current = this.messagesNotYetRead.poll();
        if (this.current == null) {
            return false;
        }
        this.notYetReadBytes -= this.current.getBody().getBytes(StandardCharsets.UTF_8).length;
        Preconditions.checkState(this.notYetReadBytes >= 0);
        long now = now();
        this.numReadBytes.add(now, this.current.getBody().getBytes(StandardCharsets.UTF_8).length);
        this.recentMessageBytes.add(Integer.valueOf(this.current.getBody().getBytes(StandardCharsets.UTF_8).length));
        this.minReadTimestampMsSinceEpoch.add(now, getCurrentTimestamp().getMillis());
        if (getCurrentTimestamp().getMillis() < this.lastWatermarkMsSinceEpoch) {
            this.numLateMessages.add(now, 1L);
        }
        this.safeToDeleteIds.add(this.current.getMessageId());
        return true;
    }

    public void close() throws IOException {
        this.active.set(false);
        maybeCloseClient();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybeCloseClient() throws IOException {
        AmazonSQS sqs;
        if (this.active.get() || this.numInFlightCheckpoints.get() != 0 || (sqs = this.source.getSqs()) == null) {
            return;
        }
        sqs.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void delete(List<String> list) throws IOException {
        AtomicInteger atomicInteger = new AtomicInteger();
        Iterator it = ((Map) list.stream().collect(Collectors.groupingBy(str -> {
            return Integer.valueOf(atomicInteger.getAndIncrement() / 10);
        }))).values().iterator();
        while (it.hasNext()) {
            deleteBatch((List) it.next());
        }
    }

    private void deleteBatch(List<String> list) throws IOException {
        int i = 0;
        Collection arrayList = new ArrayList();
        Map map = (Map) IntStream.range(0, list.size()).boxed().filter(num -> {
            return this.inFlight.containsKey(list.get(num.intValue()));
        }).collect(Collectors.toMap((v0) -> {
            return v0.toString();
        }, num2 -> {
            return this.inFlight.get(list.get(num2.intValue())).receiptHandle;
        }));
        while (!map.isEmpty()) {
            if (i >= BATCH_OPERATION_MAX_RETIRES) {
                throw new IOException("Failed to delete " + map.size() + " messages after " + i + " retries: " + String.join(", ", arrayList));
            }
            Set set = (Set) this.source.getSqs().deleteMessageBatch(this.source.getRead().queueUrl(), (List) map.entrySet().stream().map(entry -> {
                return new DeleteMessageBatchRequestEntry((String) entry.getKey(), (String) entry.getValue());
            }).collect(Collectors.toList())).getFailed().stream().filter(batchResultErrorEntry -> {
                return !batchResultErrorEntry.getCode().equals("ReceiptHandleIsInvalid");
            }).collect(Collectors.toSet());
            map.keySet().retainAll((Collection) set.stream().map((v0) -> {
                return v0.getId();
            }).collect(Collectors.toSet()));
            arrayList = (List) set.stream().map((v0) -> {
                return v0.getMessage();
            }).collect(Collectors.toList());
            i++;
        }
        this.deletedIds.add(list);
    }

    private void retire() {
        long now = now();
        while (true) {
            List<String> poll = this.deletedIds.poll();
            if (poll == null) {
                return;
            }
            this.numDeleted.add(now, poll.size());
            for (String str : poll) {
                this.inFlight.remove(str);
                this.safeToDeleteIds.remove(str);
            }
        }
    }

    private void pull() {
        if (this.inFlight.size() >= MAX_IN_FLIGHT) {
            return;
        }
        long now = now();
        long j = now + this.visibilityTimeoutMs;
        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(this.source.getRead().queueUrl());
        receiveMessageRequest.setMaxNumberOfMessages(10);
        receiveMessageRequest.setAttributeNames(Arrays.asList(MessageSystemAttributeName.SentTimestamp.toString()));
        List<Message> messages = this.source.getSqs().receiveMessage(receiveMessageRequest).getMessages();
        if (messages == null || messages.isEmpty()) {
            this.numEmptyReceives.add(now, 1L);
            return;
        }
        this.lastReceivedMsSinceEpoch = now;
        for (Message message : messages) {
            setRequestTimeMsSinceEpoch(message, now);
            this.messagesNotYetRead.add(message);
            this.notYetReadBytes += message.getBody().getBytes(StandardCharsets.UTF_8).length;
            this.inFlight.put(message.getMessageId(), new InFlightState(message.getReceiptHandle(), now, j));
            this.numReceived++;
            this.numReceivedRecently.add(now, 1L);
            this.minReceivedTimestampMsSinceEpoch.add(now, getTimestamp(message).getMillis());
            this.maxReceivedTimestampMsSinceEpoch.add(now, getTimestamp(message).getMillis());
            this.minUnreadTimestampMsSinceEpoch.add(now, getTimestamp(message).getMillis());
        }
    }

    long now() {
        return System.currentTimeMillis();
    }

    private void extend() throws IOException {
        while (true) {
            long now = now();
            ArrayList arrayList = new ArrayList();
            ArrayList<String> arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            for (Map.Entry<String, InFlightState> entry : this.inFlight.entrySet()) {
                if (entry.getValue().visibilityDeadlineMsSinceEpoch - ((this.visibilityTimeoutMs * 20) / 100) > now) {
                    break;
                }
                if (entry.getValue().visibilityDeadlineMsSinceEpoch - VISIBILITY_TOO_LATE.getMillis() >= now) {
                    if (entry.getValue().requestTimeMsSinceEpoch + PROCESSING_TIMEOUT.getMillis() >= now) {
                        arrayList2.add(entry.getKey());
                        if (arrayList2.size() >= 10) {
                            break;
                        }
                    } else {
                        arrayList3.add(entry.getKey());
                    }
                } else {
                    arrayList.add(entry.getKey());
                }
            }
            if (arrayList.isEmpty() && arrayList2.isEmpty() && arrayList3.isEmpty()) {
                return;
            }
            if (!arrayList.isEmpty()) {
                this.numLateDeadlines.add(now, arrayList.size());
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    this.inFlight.remove((String) it.next());
                }
            }
            if (!arrayList3.isEmpty()) {
                this.numExpired.add(now, arrayList3.size());
                Iterator it2 = arrayList3.iterator();
                while (it2.hasNext()) {
                    this.inFlight.remove((String) it2.next());
                }
            }
            if (!arrayList2.isEmpty()) {
                long j = (int) ((this.visibilityTimeoutMs * 50) / 100);
                long j2 = now + j;
                for (String str : arrayList2) {
                    this.inFlight.put(str, new InFlightState(this.inFlight.get(str).receiptHandle, this.inFlight.remove(str).requestTimeMsSinceEpoch, j2));
                }
                Stream stream = arrayList2.stream();
                LinkedHashMap<String, InFlightState> linkedHashMap = this.inFlight;
                Objects.requireNonNull(linkedHashMap);
                extendBatch(now, (List) stream.map((v1) -> {
                    return r1.get(v1);
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).map(inFlightState -> {
                    return inFlightState.receiptHandle;
                }).collect(Collectors.toList()), (int) (j / 1000));
            }
        }
    }

    void extendBatch(long j, List<String> list, int i) throws IOException {
        int i2 = 0;
        int size = list.size();
        Stream<Integer> boxed = IntStream.range(0, list.size()).boxed();
        Function function = (v0) -> {
            return v0.toString();
        };
        Objects.requireNonNull(list);
        Map map = (Map) boxed.collect(Collectors.toMap(function, (v1) -> {
            return r2.get(v1);
        }));
        Collection arrayList = new ArrayList();
        while (!map.isEmpty()) {
            if (i2 >= BATCH_OPERATION_MAX_RETIRES) {
                throw new IOException("Failed to extend visibility timeout for " + map.size() + " messages after " + i2 + " retries: " + String.join(", ", arrayList));
            }
            Set set = (Set) this.source.getSqs().changeMessageVisibilityBatch(this.source.getRead().queueUrl(), (List) map.entrySet().stream().map(entry -> {
                return new ChangeMessageVisibilityBatchRequestEntry((String) entry.getKey(), (String) entry.getValue()).withVisibilityTimeout(Integer.valueOf(i));
            }).collect(Collectors.toList())).getFailed().stream().filter(batchResultErrorEntry -> {
                return !batchResultErrorEntry.getCode().equals("ReceiptHandleIsInvalid");
            }).collect(Collectors.toSet());
            map.keySet().retainAll((Collection) set.stream().map((v0) -> {
                return v0.getId();
            }).collect(Collectors.toSet()));
            arrayList = (List) set.stream().map((v0) -> {
                return v0.getMessage();
            }).collect(Collectors.toList());
            i2++;
        }
        this.numExtendedDeadlines.add(j, size);
    }

    private void stats() {
        long now = now();
        if (this.lastLogTimestampMsSinceEpoch < 0) {
            this.lastLogTimestampMsSinceEpoch = now;
            return;
        }
        if (now - this.lastLogTimestampMsSinceEpoch < LOG_PERIOD.getMillis()) {
            return;
        }
        String str = "unknown";
        long j = this.minReceivedTimestampMsSinceEpoch.get(now);
        long j2 = this.maxReceivedTimestampMsSinceEpoch.get(now);
        if (j < Long.MAX_VALUE && j2 > Long.MIN_VALUE) {
            str = (j2 - j) + "ms";
        }
        String str2 = "unknown";
        long j3 = this.minWatermarkMsSinceEpoch.get(now);
        long j4 = this.maxWatermarkMsSinceEpoch.get(now);
        if (j3 < Long.MAX_VALUE && j4 > Long.MIN_VALUE) {
            str2 = (j4 - j3) + "ms";
        }
        String str3 = (String) Iterables.getFirst(this.inFlight.keySet(), (Object) null);
        LOG.debug("SQS {} has {} received messages, {} current unread messages, {} current unread bytes, {} current in-flight msgs, {} oldest in-flight, {} current in-flight checkpoints, {} max in-flight checkpoints, {} bytes in backlog, {}B/s recent read, {} recent received, {} recent extended, {} recent late extended, {} recent deleted, {} recent released, {} recent expired, {} recent message timestamp skew, {} recent watermark skew, {} recent late messages, {} last reported watermark", new Object[]{this.source.getRead().queueUrl(), Long.valueOf(this.numReceived), Integer.valueOf(this.messagesNotYetRead.size()), Long.valueOf(this.notYetReadBytes), Integer.valueOf(this.inFlight.size()), str3 != null ? (now - this.inFlight.get(str3).requestTimeMsSinceEpoch) + "ms" : "no", Integer.valueOf(this.numInFlightCheckpoints.get()), Integer.valueOf(this.maxInFlightCheckpoints), Long.valueOf(getTotalBacklogBytes()), Long.valueOf(this.numReadBytes.get(now) / (SAMPLE_PERIOD.getMillis() / 1000)), Long.valueOf(this.numReceivedRecently.get(now)), Long.valueOf(this.numExtendedDeadlines.get(now)), Long.valueOf(this.numLateDeadlines.get(now)), Long.valueOf(this.numDeleted.get(now)), Long.valueOf(this.numReleased.get(now)), Long.valueOf(this.numExpired.get(now)), str, str2, Long.valueOf(this.numLateMessages.get(now)), new Instant(this.lastWatermarkMsSinceEpoch)});
        this.lastLogTimestampMsSinceEpoch = now;
    }

    private long avgMessageBytes() {
        if (this.recentMessageBytes.isEmpty()) {
            return -1L;
        }
        return (long) this.recentMessageBytes.stream().mapToDouble(num -> {
            return num.intValue();
        }).average().getAsDouble();
    }

    private Instant getTimestamp(Message message) {
        return new Instant(Long.parseLong((String) message.getAttributes().get(MessageSystemAttributeName.SentTimestamp.toString())));
    }

    private void setRequestTimeMsSinceEpoch(Message message, long j) {
        HashMap hashMap = new HashMap();
        hashMap.put("requestTimeMsSinceEpoch", new MessageAttributeValue().withStringValue(Long.toString(j)));
        message.setMessageAttributes(hashMap);
    }

    private Long getRequestTimeMsSinceEpoch(Message message) {
        return Long.valueOf(Long.parseLong(((MessageAttributeValue) message.getMessageAttributes().get("requestTimeMsSinceEpoch")).getStringValue()));
    }
}
