package org.apache.beam.sdk.io.jms;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import javax.jms.Message;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefaultCoder(AvroCoder.class)
/* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsCheckpointMark.class */
public class JmsCheckpointMark implements UnboundedSource.CheckpointMark {
    private static final Logger LOG = LoggerFactory.getLogger(JmsCheckpointMark.class);
    private final State state = new State(this);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsCheckpointMark$State.class */
    public class State {
        private final ReentrantReadWriteLock lock;
        private final List<Message> messages;
        private Instant oldestPendingTimestamp;

        public State(JmsCheckpointMark jmsCheckpointMark) {
            this(new ArrayList(), BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        private State(List<Message> list, Instant instant) {
            this.lock = new ReentrantReadWriteLock();
            this.messages = list;
            this.oldestPendingTimestamp = instant;
        }

        public State snapshot() {
            return (State) atomicRead(() -> {
                return new State(new ArrayList(this.messages), this.oldestPendingTimestamp);
            });
        }

        public Instant getOldestPendingTimestamp() {
            return (Instant) atomicRead(() -> {
                return this.oldestPendingTimestamp;
            });
        }

        public List<Message> getMessages() {
            return (List) atomicRead(() -> {
                return this.messages;
            });
        }

        public void addMessage(Message message) {
            atomicWrite(() -> {
                this.messages.add(message);
            });
        }

        public void removeMessages(List<Message> list) {
            atomicWrite(() -> {
                this.messages.removeAll(list);
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updateOldestPendingTimestampIf(Instant instant, BiFunction<Instant, Instant, Boolean> biFunction) {
            atomicWrite(() -> {
                if (((Boolean) biFunction.apply(instant, this.oldestPendingTimestamp)).booleanValue()) {
                    this.oldestPendingTimestamp = instant;
                }
            });
        }

        public <T> T atomicRead(Supplier<T> supplier) {
            this.lock.readLock().lock();
            try {
                return supplier.get();
            } finally {
                this.lock.readLock().unlock();
            }
        }

        public void atomicWrite(Runnable runnable) {
            this.lock.writeLock().lock();
            try {
                runnable.run();
            } finally {
                this.lock.writeLock().unlock();
            }
        }
    }

    protected List<Message> getMessages() {
        return this.state.getMessages();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addMessage(Message message) throws Exception {
        Instant instant = new Instant(message.getJMSTimestamp());
        this.state.atomicWrite(() -> {
            this.state.updateOldestPendingTimestampIf(instant, (v0, v1) -> {
                return v0.isBefore(v1);
            });
            this.state.addMessage(message);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Instant getOldestPendingTimestamp() {
        return this.state.getOldestPendingTimestamp();
    }

    public void finalizeCheckpoint() {
        State snapshot = this.state.snapshot();
        for (Message message : snapshot.messages) {
            try {
                message.acknowledge();
                snapshot.updateOldestPendingTimestampIf(new Instant(message.getJMSTimestamp()), (v0, v1) -> {
                    return v0.isAfter(v1);
                });
            } catch (Exception e) {
                LOG.error("Exception while finalizing message: {}", e);
            }
        }
        this.state.atomicWrite(() -> {
            this.state.removeMessages(snapshot.messages);
            this.state.updateOldestPendingTimestampIf(snapshot.oldestPendingTimestamp, (v0, v1) -> {
                return v0.isAfter(v1);
            });
        });
    }
}
