package org.apache.beam.sdk.io.aws2.sqs;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.BucketingFunction;
import org.apache.beam.sdk.util.MovingFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Collections2;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.class */
public class SqsUnboundedReader extends UnboundedSource.UnboundedReader<SqsMessage> {
    private static final String RECEIPT_HANDLE_IS_INVALID = "ReceiptHandleIsInvalid";
    public static final int MAX_NUMBER_OF_MESSAGES = 10;
    private static final int BATCH_OPERATION_MAX_RETIRES = 5;
    private static final int VISIBILITY_EXTENSION_PCT = 50;
    private static final int VISIBILITY_SAFETY_PCT = 20;
    private static final int DELETE_BATCH_SIZE = 10;
    private static final int MAX_IN_FLIGHT = 20000;
    private static final int MIN_WATERMARK_MESSAGES = 10;
    private static final int MIN_WATERMARK_SPREAD = 2;
    private final SqsUnboundedSource source;
    private final Clock clock;
    private final AwsOptions awsOptions;
    private AtomicBoolean active;
    private SqsClient sqsClient;
    private SqsMessage current;
    private final Queue<SqsMessage> messagesNotYetRead;
    private Set<String> safeToDeleteIds;
    private long visibilityTimeoutMs;
    private long notYetReadBytes;
    private BucketingFunction minUnreadTimestampMsSinceEpoch;
    private MovingFunction minReadTimestampMsSinceEpoch;
    private final LinkedHashMap<String, InFlightState> inFlight;
    private final Queue<List<String>> deletedIds;
    private long lastReceivedMsSinceEpoch;
    private long lastWatermarkMsSinceEpoch;
    private long lastLogTimestampMsSinceEpoch;
    private long numReceived;
    private MovingFunction numReceivedRecently;
    private MovingFunction numExtendedDeadlines;
    private MovingFunction numLateDeadlines;
    private MovingFunction numDeleted;
    private MovingFunction numExpired;
    private MovingFunction numReleased;
    private MovingFunction numReadBytes;
    private MovingFunction minReceivedTimestampMsSinceEpoch;
    private MovingFunction maxReceivedTimestampMsSinceEpoch;
    private MovingFunction minWatermarkMsSinceEpoch;
    private MovingFunction maxWatermarkMsSinceEpoch;
    private MovingFunction numLateMessages;
    AtomicInteger numInFlightCheckpoints;
    private int maxInFlightCheckpoints;
    private static final Logger LOG = LoggerFactory.getLogger(SqsUnboundedReader.class);
    private static final Duration PROCESSING_TIMEOUT = Duration.standardMinutes(2);
    private static final Duration VISIBILITY_TOO_LATE = Duration.standardSeconds(2);
    private static final Duration SAMPLE_PERIOD = Duration.standardMinutes(1);
    private static final Duration SAMPLE_UPDATE = Duration.standardSeconds(5);
    private static final Duration LOG_PERIOD = Duration.standardSeconds(30);
    private static final Combine.BinaryCombineLongFn MIN = Min.ofLongs();
    private static final Combine.BinaryCombineLongFn MAX = Max.ofLongs();
    private static final Combine.BinaryCombineLongFn SUM = Sum.ofLongs();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader$InFlightState.class */
    public static class InFlightState {
        String receiptHandle;
        long requestTimeMsSinceEpoch;
        long visibilityDeadlineMsSinceEpoch;

        public InFlightState(String str, long j, long j2) {
            this.receiptHandle = str;
            this.requestTimeMsSinceEpoch = j;
            this.visibilityDeadlineMsSinceEpoch = j2;
        }
    }

    private static MovingFunction newFun(Combine.BinaryCombineLongFn binaryCombineLongFn) {
        return new MovingFunction(SAMPLE_PERIOD.getMillis(), SAMPLE_UPDATE.getMillis(), MIN_WATERMARK_SPREAD, 10, binaryCombineLongFn);
    }

    public SqsUnboundedReader(SqsUnboundedSource sqsUnboundedSource, SqsCheckpointMark sqsCheckpointMark, AwsOptions awsOptions) throws IOException {
        this(sqsUnboundedSource, sqsCheckpointMark, awsOptions, Clock.systemUTC());
    }

    @VisibleForTesting
    SqsUnboundedReader(SqsUnboundedSource sqsUnboundedSource, SqsCheckpointMark sqsCheckpointMark, AwsOptions awsOptions, Clock clock) throws IOException {
        this.active = new AtomicBoolean(true);
        this.sqsClient = null;
        this.source = sqsUnboundedSource;
        this.clock = clock;
        this.awsOptions = awsOptions;
        this.messagesNotYetRead = new ArrayDeque(10);
        this.safeToDeleteIds = new HashSet();
        this.inFlight = new LinkedHashMap<>();
        this.deletedIds = new ConcurrentLinkedQueue();
        this.visibilityTimeoutMs = -1L;
        this.notYetReadBytes = 0L;
        this.minUnreadTimestampMsSinceEpoch = new BucketingFunction(SAMPLE_UPDATE.getMillis(), MIN_WATERMARK_SPREAD, 10, MIN);
        this.minReadTimestampMsSinceEpoch = newFun(MIN);
        this.lastReceivedMsSinceEpoch = -1L;
        this.lastWatermarkMsSinceEpoch = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
        this.current = null;
        this.lastLogTimestampMsSinceEpoch = -1L;
        this.numReceived = 0L;
        this.numReceivedRecently = newFun(SUM);
        this.numExtendedDeadlines = newFun(SUM);
        this.numLateDeadlines = newFun(SUM);
        this.numDeleted = newFun(SUM);
        this.numExpired = newFun(SUM);
        this.numReleased = newFun(SUM);
        this.numReadBytes = newFun(SUM);
        this.minReceivedTimestampMsSinceEpoch = newFun(MIN);
        this.maxReceivedTimestampMsSinceEpoch = newFun(MAX);
        this.minWatermarkMsSinceEpoch = newFun(MIN);
        this.maxWatermarkMsSinceEpoch = newFun(MAX);
        this.numLateMessages = newFun(SUM);
        this.numInFlightCheckpoints = new AtomicInteger();
        this.maxInFlightCheckpoints = 0;
        if (sqsCheckpointMark != null) {
            initClient();
            expireBatchForRedelivery(sqsCheckpointMark.notYetReadReceipts);
        }
    }

    public Instant getWatermark() {
        long now = now();
        long j = this.minReadTimestampMsSinceEpoch.get(now);
        long j2 = this.minUnreadTimestampMsSinceEpoch.get();
        if (j == Long.MAX_VALUE && j2 == Long.MAX_VALUE && this.lastReceivedMsSinceEpoch >= 0 && now > this.lastReceivedMsSinceEpoch + SAMPLE_PERIOD.getMillis()) {
            this.lastWatermarkMsSinceEpoch = now;
        } else if (this.minReadTimestampMsSinceEpoch.isSignificant() || this.minUnreadTimestampMsSinceEpoch.isSignificant()) {
            this.lastWatermarkMsSinceEpoch = Math.min(j, j2);
        }
        this.minWatermarkMsSinceEpoch.add(now, this.lastWatermarkMsSinceEpoch);
        this.maxWatermarkMsSinceEpoch.add(now, this.lastWatermarkMsSinceEpoch);
        return new Instant(this.lastWatermarkMsSinceEpoch);
    }

    /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
    public SqsMessage m60getCurrent() throws NoSuchElementException {
        if (this.current == null) {
            throw new NoSuchElementException();
        }
        return this.current;
    }

    public Instant getCurrentTimestamp() throws NoSuchElementException {
        if (this.current == null) {
            throw new NoSuchElementException();
        }
        return new Instant(this.current.getTimeStamp());
    }

    public byte[] getCurrentRecordId() throws NoSuchElementException {
        if (this.current == null) {
            throw new NoSuchElementException();
        }
        return this.current.getMessageId().getBytes(StandardCharsets.UTF_8);
    }

    public UnboundedSource.CheckpointMark getCheckpointMark() {
        this.maxInFlightCheckpoints = Math.max(this.maxInFlightCheckpoints, this.numInFlightCheckpoints.incrementAndGet());
        return new SqsCheckpointMark(this, this.safeToDeleteIds, Collections2.transform(this.messagesNotYetRead, (v0) -> {
            return v0.getReceiptHandle();
        }));
    }

    /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
    public SqsUnboundedSource m59getCurrentSource() {
        return this.source;
    }

    public boolean start() throws IOException {
        initClient();
        this.visibilityTimeoutMs = Integer.parseInt((String) this.sqsClient.getQueueAttributes(builder -> {
            builder.queueUrl(queueUrl()).attributeNames(new QueueAttributeName[]{QueueAttributeName.VISIBILITY_TIMEOUT});
        }).attributes().get(QueueAttributeName.VISIBILITY_TIMEOUT)) * 1000;
        return advance();
    }

    private String queueUrl() {
        return this.source.getRead().queueUrl();
    }

    private void initClient() {
        if (this.sqsClient == null) {
            if (this.source.getRead().sqsClientProvider() != null) {
                this.sqsClient = this.source.getRead().sqsClientProvider().getSqsClient();
            } else {
                this.sqsClient = (SqsClient) ClientBuilderFactory.buildClient(this.awsOptions, SqsClient.builder(), this.source.getRead().clientConfiguration());
            }
        }
    }

    public boolean advance() throws IOException {
        stats();
        if (this.current != null) {
            this.minUnreadTimestampMsSinceEpoch.remove(this.current.getRequestTimeStamp());
            this.current = null;
        }
        retire();
        extend();
        if (this.messagesNotYetRead.isEmpty()) {
            pull();
        }
        this.current = this.messagesNotYetRead.poll();
        if (this.current == null) {
            return false;
        }
        int length = this.current.getBody().getBytes(StandardCharsets.UTF_8).length;
        this.notYetReadBytes -= length;
        Preconditions.checkState(this.notYetReadBytes >= 0);
        long now = now();
        this.numReadBytes.add(now, length);
        this.minReadTimestampMsSinceEpoch.add(now, getCurrentTimestamp().getMillis());
        if (getCurrentTimestamp().getMillis() < this.lastWatermarkMsSinceEpoch) {
            this.numLateMessages.add(now, 1L);
        }
        this.safeToDeleteIds.add(this.current.getMessageId());
        return true;
    }

    public void close() throws IOException {
        this.active.set(false);
        maybeCloseClient();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybeCloseClient() throws IOException {
        if (this.active.get() || this.numInFlightCheckpoints.get() != 0 || this.sqsClient == null) {
            return;
        }
        this.sqsClient.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void delete(List<String> list) throws IOException {
        ArrayList arrayList = new ArrayList(10);
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            InFlightState inFlightState = this.inFlight.get(it.next());
            if (inFlightState != null) {
                arrayList.add(inFlightState.receiptHandle);
                if (arrayList.size() == 10) {
                    deleteBatch(arrayList);
                    arrayList.clear();
                }
            }
        }
        if (!arrayList.isEmpty()) {
            deleteBatch(arrayList);
        }
        this.deletedIds.add(list);
    }

    private void deleteBatch(List<String> list) throws IOException {
        int i = 0;
        Map map = (Map) Streams.mapWithIndex(list.stream(), (str, j) -> {
            return (DeleteMessageBatchRequestEntry) DeleteMessageBatchRequestEntry.builder().id(Long.toString(j)).receiptHandle(str).build();
        }).collect(Collectors.toMap(deleteMessageBatchRequestEntry -> {
            return deleteMessageBatchRequestEntry.id();
        }, Function.identity()));
        while (!map.isEmpty()) {
            if (i >= BATCH_OPERATION_MAX_RETIRES) {
                throw new IOException("Failed to delete " + map.size() + " messages after " + i + " retries");
            }
            Map map2 = (Map) this.sqsClient.deleteMessageBatch((DeleteMessageBatchRequest) DeleteMessageBatchRequest.builder().queueUrl(queueUrl()).entries(map.values()).build()).failed().stream().collect(Collectors.partitioningBy(this::isHandleInvalid, Collectors.mapping(batchResultErrorEntry -> {
                return batchResultErrorEntry.id();
            }, Collectors.toSet())));
            map.keySet().retainAll((Collection) map2.getOrDefault(Boolean.FALSE, ImmutableSet.of()));
            int size = ((Set) map2.getOrDefault(Boolean.TRUE, ImmutableSet.of())).size();
            if (size > 0) {
                LOG.warn("Failed to delete {} messages due to expired receipt handles.", Integer.valueOf(size));
            }
            i++;
        }
    }

    private boolean isHandleInvalid(BatchResultErrorEntry batchResultErrorEntry) {
        return RECEIPT_HANDLE_IS_INVALID.equals(batchResultErrorEntry.code());
    }

    private void retire() {
        long now = now();
        while (true) {
            List<String> poll = this.deletedIds.poll();
            if (poll == null) {
                return;
            }
            this.numDeleted.add(now, poll.size());
            for (String str : poll) {
                this.inFlight.remove(str);
                this.safeToDeleteIds.remove(str);
            }
        }
    }

    private void pull() {
        if (this.inFlight.size() >= MAX_IN_FLIGHT) {
            return;
        }
        long now = now();
        long j = now + this.visibilityTimeoutMs;
        List<Message> messages = this.sqsClient.receiveMessage((ReceiveMessageRequest) ReceiveMessageRequest.builder().maxNumberOfMessages(10).attributeNamesWithStrings(new String[]{MessageSystemAttributeName.SENT_TIMESTAMP.toString()}).queueUrl(queueUrl()).build()).messages();
        if (messages == null || messages.isEmpty()) {
            return;
        }
        this.lastReceivedMsSinceEpoch = now;
        for (Message message : messages) {
            long parseLong = Long.parseLong((String) message.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP));
            SqsMessage create = SqsMessage.create(message.body(), message.messageId(), message.receiptHandle(), parseLong, now);
            this.messagesNotYetRead.add(create);
            this.notYetReadBytes += create.getBody().getBytes(StandardCharsets.UTF_8).length;
            this.inFlight.put(create.getMessageId(), new InFlightState(create.getReceiptHandle(), now, j));
            this.numReceived++;
            this.numReceivedRecently.add(now, 1L);
            this.minReceivedTimestampMsSinceEpoch.add(now, parseLong);
            this.maxReceivedTimestampMsSinceEpoch.add(now, parseLong);
            this.minUnreadTimestampMsSinceEpoch.add(now, parseLong);
        }
    }

    long now() {
        return this.clock.millis();
    }

    private void extend() throws IOException {
        while (true) {
            long now = now();
            ArrayList arrayList = new ArrayList();
            ArrayList<String> arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            for (Map.Entry<String, InFlightState> entry : this.inFlight.entrySet()) {
                if (entry.getValue().visibilityDeadlineMsSinceEpoch - ((this.visibilityTimeoutMs * 20) / 100) > now) {
                    break;
                }
                if (entry.getValue().visibilityDeadlineMsSinceEpoch - VISIBILITY_TOO_LATE.getMillis() >= now) {
                    if (entry.getValue().requestTimeMsSinceEpoch + PROCESSING_TIMEOUT.getMillis() >= now) {
                        arrayList2.add(entry.getKey());
                        if (arrayList2.size() >= 10) {
                            break;
                        }
                    } else {
                        arrayList3.add(entry.getKey());
                    }
                } else {
                    arrayList.add(entry.getKey());
                }
            }
            if (arrayList.isEmpty() && arrayList2.isEmpty() && arrayList3.isEmpty()) {
                return;
            }
            if (!arrayList.isEmpty()) {
                this.numLateDeadlines.add(now, arrayList.size());
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    this.inFlight.remove((String) it.next());
                }
            }
            if (!arrayList3.isEmpty()) {
                this.numExpired.add(now, arrayList3.size());
                Iterator it2 = arrayList3.iterator();
                while (it2.hasNext()) {
                    this.inFlight.remove((String) it2.next());
                }
            }
            if (!arrayList2.isEmpty()) {
                long j = (int) ((this.visibilityTimeoutMs * 50) / 100);
                long j2 = now + j;
                ArrayList arrayList4 = new ArrayList(arrayList2.size());
                for (String str : arrayList2) {
                    InFlightState remove = this.inFlight.remove(str);
                    remove.visibilityDeadlineMsSinceEpoch = j2;
                    this.inFlight.put(str, remove);
                    arrayList4.add(KV.of(str, remove.receiptHandle));
                }
                extendBatch(now, arrayList4, (int) (j / 1000));
            }
        }
    }

    void expireBatchForRedelivery(List<String> list) throws IOException {
        List<KV<String, String>> list2 = (List) Streams.mapWithIndex(list.stream(), (str, j) -> {
            return KV.of(Long.toString(j), str);
        }).collect(Collectors.toList());
        long now = now();
        extendBatch(now, list2, 0);
        this.numReleased.add(now, list.size());
    }

    void extendBatch(long j, List<KV<String, String>> list, int i) throws IOException {
        int i2 = 0;
        Map map = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, kv -> {
            return (ChangeMessageVisibilityBatchRequestEntry) ChangeMessageVisibilityBatchRequestEntry.builder().visibilityTimeout(Integer.valueOf(i)).id((String) kv.getKey()).receiptHandle((String) kv.getValue()).build();
        }));
        while (!map.isEmpty()) {
            if (i2 >= BATCH_OPERATION_MAX_RETIRES) {
                throw new IOException("Failed to extend visibility timeout for " + list.size() + " messages after " + i2 + " retries");
            }
            Map map2 = (Map) this.sqsClient.changeMessageVisibilityBatch((ChangeMessageVisibilityBatchRequest) ChangeMessageVisibilityBatchRequest.builder().queueUrl(queueUrl()).entries(map.values()).build()).failed().stream().collect(Collectors.partitioningBy(this::isHandleInvalid, Collectors.mapping(batchResultErrorEntry -> {
                return batchResultErrorEntry.id();
            }, Collectors.toSet())));
            map.keySet().retainAll((Collection) map2.getOrDefault(Boolean.FALSE, ImmutableSet.of()));
            if (i > 0) {
                this.numExtendedDeadlines.add(j, r0.successful().size());
                Set set = (Set) map2.getOrDefault(Boolean.TRUE, ImmutableSet.of());
                if (set.size() > 0) {
                    this.numLateDeadlines.add(j, set.size());
                    Iterator it = set.iterator();
                    while (it.hasNext()) {
                        this.inFlight.remove((String) it.next());
                    }
                    LOG.warn("Failed to extend visibility timeout for {} messages with expired receipt handles.", Integer.valueOf(set.size()));
                }
            }
            i2++;
        }
    }

    @VisibleForTesting
    long getVisibilityTimeoutMs() {
        return this.visibilityTimeoutMs;
    }

    private void stats() {
        long now = now();
        if (this.lastLogTimestampMsSinceEpoch < 0) {
            this.lastLogTimestampMsSinceEpoch = now;
            return;
        }
        if (now - this.lastLogTimestampMsSinceEpoch < LOG_PERIOD.getMillis()) {
            return;
        }
        String str = "unknown";
        long j = this.minReceivedTimestampMsSinceEpoch.get(now);
        long j2 = this.maxReceivedTimestampMsSinceEpoch.get(now);
        if (j < Long.MAX_VALUE && j2 > Long.MIN_VALUE) {
            str = (j2 - j) + "ms";
        }
        String str2 = "unknown";
        long j3 = this.minWatermarkMsSinceEpoch.get(now);
        long j4 = this.maxWatermarkMsSinceEpoch.get(now);
        if (j3 < Long.MAX_VALUE && j4 > Long.MIN_VALUE) {
            str2 = (j4 - j3) + "ms";
        }
        String str3 = (String) Iterables.getFirst(this.inFlight.keySet(), (Object) null);
        LOG.info("SQS {} has {} received messages, {} current unread messages, {} current unread bytes, {} current in-flight msgs, {} oldest in-flight, {} current in-flight checkpoints, {} max in-flight checkpoints, {}B/s recent read, {} recent received, {} recent extended, {} recent late extended, {} recent deleted, {} recent released, {} recent expired, {} recent message timestamp skew, {} recent watermark skew, {} recent late messages, {} last reported watermark, {} min recent read timestamp (significance = {}), {} min recent unread timestamp (significance = {}), {} last receive timestamp", new Object[]{queueUrl(), Long.valueOf(this.numReceived), Integer.valueOf(this.messagesNotYetRead.size()), Long.valueOf(this.notYetReadBytes), Integer.valueOf(this.inFlight.size()), str3 != null ? (now - this.inFlight.get(str3).requestTimeMsSinceEpoch) + "ms" : "no", Integer.valueOf(this.numInFlightCheckpoints.get()), Integer.valueOf(this.maxInFlightCheckpoints), Long.valueOf(this.numReadBytes.get(now) / (SAMPLE_PERIOD.getMillis() / 1000)), Long.valueOf(this.numReceivedRecently.get(now)), Long.valueOf(this.numExtendedDeadlines.get(now)), Long.valueOf(this.numLateDeadlines.get(now)), Long.valueOf(this.numDeleted.get(now)), Long.valueOf(this.numReleased.get(now)), Long.valueOf(this.numExpired.get(now)), str, str2, Long.valueOf(this.numLateMessages.get(now)), new Instant(this.lastWatermarkMsSinceEpoch), new Instant(this.minReadTimestampMsSinceEpoch.get(now)), Boolean.valueOf(this.minReadTimestampMsSinceEpoch.isSignificant()), new Instant(this.minUnreadTimestampMsSinceEpoch.get()), Boolean.valueOf(this.minUnreadTimestampMsSinceEpoch.isSignificant()), new Instant(this.lastReceivedMsSinceEpoch)});
        this.lastLogTimestampMsSinceEpoch = now;
    }
}
