/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.testutils.sink;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.sink.PulsarSink;
import org.apache.flink.connector.pulsar.sink.PulsarSinkBuilder;
import org.apache.flink.connector.pulsar.sink.PulsarSinkOptions;
import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils;
import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connector.pulsar.testutils.sink.reader.PulsarPartitionDataReader;
import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.shade.com.google.common.io.Closer;

public abstract class PulsarSinkTestContext
extends PulsarTestContext<String>
implements DataStreamSinkV2ExternalContext<String> {
    private static final String TOPIC_NAME_PREFIX = "persistent://public/default/flink-sink-topic-";
    private static final int RECORD_SIZE_UPPER_BOUND = 300;
    private static final int RECORD_SIZE_LOWER_BOUND = 100;
    private static final int RECORD_STRING_SIZE = 20;
    private List<String> topics = this.generateTopics();
    private final Closer closer = Closer.create();

    public PulsarSinkTestContext(PulsarTestEnvironment environment) {
        super(environment, Schema.STRING);
    }

    public Sink<String> createSink(TestingSinkSettings sinkSettings) {
        if (this.creatTopic()) {
            for (String topic : this.topics) {
                try {
                    this.operator.createTopic(topic, 4);
                }
                catch (Exception e) {
                    throw new FlinkRuntimeException((Throwable)e);
                }
            }
        }
        DeliveryGuarantee guarantee = PulsarTestCommonUtils.toDeliveryGuarantee(sinkSettings.getCheckpointingMode());
        PulsarSinkBuilder builder = PulsarSink.builder().setServiceUrl(this.operator.serviceUrl()).setDeliveryGuarantee(guarantee).setSerializationSchema(this.schema).enableSchemaEvolution().setConfig(PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES, (Object)4);
        if (this.creatTopic()) {
            builder.setTopics(this.topics).setTopicRoutingMode(TopicRoutingMode.ROUND_ROBIN);
        } else {
            builder.setTopicRouter((TopicRouter)new CustomTopicRouter(this.topics));
        }
        this.setSinkBuilder((PulsarSinkBuilder<String>)builder);
        return builder.build();
    }

    public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
        PulsarPartitionDataReader<String> reader = this.createSinkDataReader(this.topics);
        this.closer.register(reader);
        return reader;
    }

    public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
        Random random = new Random(seed);
        int recordSize = random.nextInt(200) + 100;
        ArrayList<String> records = new ArrayList<String>(recordSize);
        for (int i = 0; i < recordSize; ++i) {
            int size = random.nextInt(20) + 20;
            String record = "index:" + i + "-data:" + RandomStringUtils.randomAlphanumeric((int)size);
            records.add(record);
        }
        return records;
    }

    public TypeInformation<String> getProducedType() {
        return Types.STRING;
    }

    @Override
    public void close() throws Exception {
        this.topics = this.generateTopics();
        this.closer.close();
    }

    protected List<String> generateTopics() {
        String topicName = TOPIC_NAME_PREFIX + RandomStringUtils.randomAlphanumeric((int)8);
        return Collections.singletonList(topicName);
    }

    protected void setSinkBuilder(PulsarSinkBuilder<String> builder) {
    }

    protected boolean creatTopic() {
        return true;
    }

    protected PulsarPartitionDataReader<String> createSinkDataReader(List<String> topics) {
        return new PulsarPartitionDataReader<String>(this.operator, topics, Schema.STRING);
    }

    public static class CustomTopicRouter
    implements TopicRouter<String> {
        private static final long serialVersionUID = 1698701183626468094L;
        private final List<String> topics;
        private final AtomicInteger counter;

        public CustomTopicRouter(List<String> topics) {
            this.topics = topics;
            this.counter = new AtomicInteger(0);
        }

        public TopicPartition route(String s, String key, List<TopicPartition> partitions, PulsarSinkContext context) {
            int index = this.counter.incrementAndGet() / 10 % this.topics.size();
            String topic = this.topics.get(index);
            return new TopicPartition(topic);
        }
    }
}

