package org.apache.flink.connector.pulsar.testutils.function;

import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/pulsar/testutils/function/ControlSource.class */
public class ControlSource extends AbstractRichFunction implements SourceFunction<String>, CheckpointListener, CheckpointedFunction {
    private static final long serialVersionUID = -3124248855144675017L;
    private static final Logger LOG = LoggerFactory.getLogger(StopSignal.class);
    private final SharedReference<MessageGenerator> sharedGenerator;
    private final SharedReference<StopSignal> sharedSignal;
    private Object lock;

    /* loaded from: input_file:org/apache/flink/connector/pulsar/testutils/function/ControlSource$MessageGenerator.class */
    private static class MessageGenerator implements Iterator<String> {
        private final String topic;
        private final DeliveryGuarantee guarantee;
        private final int messageCounts;
        private final List<String> expectedRecords;
        private final Duration interval;

        public MessageGenerator(String str, DeliveryGuarantee deliveryGuarantee, int i, Duration duration) {
            this.topic = str;
            this.guarantee = deliveryGuarantee;
            this.messageCounts = i;
            this.expectedRecords = new ArrayList(i);
            this.interval = duration;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.messageCounts > this.expectedRecords.size();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public String next() {
            String str = this.guarantee.name() + "-" + this.topic + "-" + this.expectedRecords.size() + "-" + RandomStringUtils.randomAlphanumeric(10);
            this.expectedRecords.add(str);
            Uninterruptibles.sleepUninterruptibly(this.interval);
            return str;
        }

        public List<String> getExpectedRecords() {
            return this.expectedRecords;
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/pulsar/testutils/function/ControlSource$StopSignal.class */
    private static class StopSignal implements Closeable {
        private static final Logger LOG = LoggerFactory.getLogger(StopSignal.class);
        private final String topic;
        private final int desiredCounts;
        private final List<String> consumedRecords;
        private final AtomicLong deadline;
        private final ExecutorService executor = Executors.newSingleThreadExecutor();

        public StopSignal(PulsarRuntimeOperator pulsarRuntimeOperator, String str, int i, Duration duration) {
            this.topic = str;
            this.desiredCounts = i;
            this.consumedRecords = Collections.synchronizedList(new ArrayList(i));
            this.deadline = new AtomicLong(duration.toMillis() + System.currentTimeMillis());
            this.executor.execute(() -> {
                while (this.consumedRecords.size() < this.desiredCounts) {
                    Iterator it = pulsarRuntimeOperator.receiveMessages(this.topic, Schema.STRING, this.desiredCounts - this.consumedRecords.size()).iterator();
                    while (it.hasNext()) {
                        this.consumedRecords.add(((Message) it.next()).getValue());
                    }
                }
            });
        }

        public boolean canStop() {
            if (this.deadline.get() >= System.currentTimeMillis()) {
                return this.consumedRecords.size() >= this.desiredCounts;
            }
            LOG.warn(String.format("Timeout for waiting the records from Pulsar. We have consumed %d messages, expect %d messages.", Integer.valueOf(this.consumedRecords.size()), Integer.valueOf(this.desiredCounts)));
            return true;
        }

        public List<String> getConsumedRecords() {
            return this.consumedRecords;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.executor.shutdown();
        }
    }

    public ControlSource(SharedObjectsExtension sharedObjectsExtension, PulsarRuntimeOperator pulsarRuntimeOperator, String str, DeliveryGuarantee deliveryGuarantee, int i, Duration duration, Duration duration2) {
        MessageGenerator messageGenerator = new MessageGenerator(str, deliveryGuarantee, i, duration);
        StopSignal stopSignal = new StopSignal(pulsarRuntimeOperator, str, i, duration2);
        this.sharedGenerator = sharedObjectsExtension.add(messageGenerator);
        this.sharedSignal = sharedObjectsExtension.add(stopSignal);
    }

    public void run(SourceFunction.SourceContext<String> sourceContext) {
        MessageGenerator messageGenerator = (MessageGenerator) this.sharedGenerator.get();
        StopSignal stopSignal = (StopSignal) this.sharedSignal.get();
        this.lock = sourceContext.getCheckpointLock();
        while (!stopSignal.canStop()) {
            synchronized (this.lock) {
                if (messageGenerator.hasNext()) {
                    sourceContext.collect(messageGenerator.next());
                }
            }
        }
    }

    public List<String> getExpectedRecords() {
        return ((MessageGenerator) this.sharedGenerator.get()).getExpectedRecords();
    }

    public List<String> getConsumedRecords() {
        return ((StopSignal) this.sharedSignal.get()).getConsumedRecords();
    }

    public void cancel() {
        LOG.warn("Triggering cancel action. Set the stop timeout to zero.");
        ((StopSignal) this.sharedSignal.get()).deadline.set(System.currentTimeMillis());
    }

    public void close() throws Exception {
        ((StopSignal) this.sharedSignal.get()).close();
    }

    public void notifyCheckpointComplete(long j) {
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) {
    }
}
