/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.testutils.function;

import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.shade.com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ControlSource
extends AbstractRichFunction
implements SourceFunction<String>,
CheckpointListener,
CheckpointedFunction {
    private static final long serialVersionUID = -3124248855144675017L;
    private static final Logger LOG = LoggerFactory.getLogger(StopSignal.class);
    private final SharedReference<MessageGenerator> sharedGenerator;
    private final SharedReference<StopSignal> sharedSignal;
    private Object lock;

    public ControlSource(SharedObjectsExtension sharedObjects, PulsarRuntimeOperator operator, String topic, DeliveryGuarantee guarantee, int messageCounts, Duration interval, Duration timeout) throws PulsarClientException {
        MessageGenerator generator = new MessageGenerator(topic, guarantee, messageCounts, interval);
        StopSignal signal = new StopSignal(operator, topic, messageCounts, timeout);
        this.sharedGenerator = sharedObjects.add((Object)generator);
        this.sharedSignal = sharedObjects.add((Object)signal);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<String> ctx) {
        MessageGenerator generator = (MessageGenerator)this.sharedGenerator.get();
        StopSignal signal = (StopSignal)this.sharedSignal.get();
        this.lock = ctx.getCheckpointLock();
        while (!signal.canStop()) {
            Object object = this.lock;
            synchronized (object) {
                if (generator.hasNext()) {
                    String message = generator.next();
                    ctx.collect((Object)message);
                }
            }
        }
    }

    public List<String> getExpectedRecords() {
        MessageGenerator generator = (MessageGenerator)this.sharedGenerator.get();
        return generator.getExpectedRecords();
    }

    public List<String> getConsumedRecords() {
        StopSignal signal = (StopSignal)this.sharedSignal.get();
        return signal.getConsumedRecords();
    }

    public void cancel() {
        LOG.warn("Triggering cancel action. Set the stop timeout to zero.");
        StopSignal signal = (StopSignal)this.sharedSignal.get();
        signal.deadline.set(System.currentTimeMillis());
    }

    public void close() throws Exception {
        StopSignal signal = (StopSignal)this.sharedSignal.get();
        signal.close();
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    public void snapshotState(FunctionSnapshotContext context) {
    }

    public void initializeState(FunctionInitializationContext context) {
    }

    private static class StopSignal
    implements Closeable {
        private static final Logger LOG = LoggerFactory.getLogger(StopSignal.class);
        private final int desiredCounts;
        private final List<String> consumedRecords;
        private final AtomicLong deadline;
        private final ExecutorService executor;
        private final Consumer<String> consumer;
        private final AtomicReference<PulsarClientException> throwableException;

        public StopSignal(PulsarRuntimeOperator operator, String topic, int messageCounts, Duration timeout) throws PulsarClientException {
            this.desiredCounts = messageCounts;
            this.consumedRecords = Collections.synchronizedList(new ArrayList(messageCounts));
            this.deadline = new AtomicLong(timeout.toMillis() + System.currentTimeMillis());
            this.executor = Executors.newSingleThreadExecutor();
            this.consumer = operator.client().newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName(RandomStringUtils.randomAlphanumeric((int)10)).subscriptionMode(SubscriptionMode.Durable).subscriptionType(SubscriptionType.Exclusive).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            this.throwableException = new AtomicReference();
            this.executor.execute(() -> {
                block2: while (this.consumedRecords.size() < this.desiredCounts) {
                    int counts = this.desiredCounts - this.consumedRecords.size();
                    for (int i = 0; i < counts; ++i) {
                        try {
                            Message message = this.consumer.receive();
                            this.consumedRecords.add((String)message.getValue());
                            continue;
                        }
                        catch (PulsarClientException e) {
                            this.throwableException.set(e);
                            continue block2;
                        }
                    }
                }
            });
        }

        public boolean canStop() {
            PulsarClientException exception = this.throwableException.get();
            if (exception != null) {
                LOG.error("Error in consuming messages from Pulsar.");
                LOG.error("", (Throwable)exception);
                return true;
            }
            if (this.deadline.get() < System.currentTimeMillis()) {
                String errorMsg = String.format("Timeout for waiting the records from Pulsar. We have consumed %d messages, expect %d messages.", this.consumedRecords.size(), this.desiredCounts);
                LOG.warn(errorMsg);
                return true;
            }
            return this.consumedRecords.size() >= this.desiredCounts;
        }

        public List<String> getConsumedRecords() {
            return this.consumedRecords;
        }

        @Override
        public void close() {
            this.executor.shutdown();
        }
    }

    private static class MessageGenerator
    implements Iterator<String> {
        private final String topic;
        private final DeliveryGuarantee guarantee;
        private final int messageCounts;
        private final List<String> expectedRecords;
        private final Duration interval;

        public MessageGenerator(String topic, DeliveryGuarantee guarantee, int messageCounts, Duration interval) {
            this.topic = topic;
            this.guarantee = guarantee;
            this.messageCounts = messageCounts;
            this.expectedRecords = new ArrayList<String>(messageCounts);
            this.interval = interval;
        }

        @Override
        public boolean hasNext() {
            return this.messageCounts > this.expectedRecords.size();
        }

        @Override
        public String next() {
            String content = this.guarantee.name() + "-" + this.topic + "-" + this.expectedRecords.size() + "-" + RandomStringUtils.randomAlphanumeric((int)10);
            this.expectedRecords.add(content);
            Uninterruptibles.sleepUninterruptibly((Duration)this.interval);
            return content;
        }

        public List<String> getExpectedRecords() {
            return this.expectedRecords;
        }
    }
}

