package org.apache.flink.connector.pulsar.testutils.sink;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.pulsar.sink.PulsarSink;
import org.apache.flink.connector.pulsar.sink.PulsarSinkBuilder;
import org.apache.flink.connector.pulsar.sink.PulsarSinkOptions;
import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils;
import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connector.pulsar.testutils.sink.reader.PulsarPartitionDataReader;
import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.shade.com.google.common.io.Closer;

/* loaded from: input_file:org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.class */
public abstract class PulsarSinkTestContext extends PulsarTestContext<String> implements DataStreamSinkV2ExternalContext<String> {
    private static final String TOPIC_NAME_PREFIX = "persistent://public/default/flink-sink-topic-";
    private static final int RECORD_SIZE_UPPER_BOUND = 300;
    private static final int RECORD_SIZE_LOWER_BOUND = 100;
    private static final int RECORD_STRING_SIZE = 20;
    private List<String> topics;
    private final Closer closer;

    /* loaded from: input_file:org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext$CustomTopicRouter.class */
    public static class CustomTopicRouter implements TopicRouter<String> {
        private static final long serialVersionUID = 1698701183626468094L;
        private final List<String> topics;
        private final AtomicInteger counter = new AtomicInteger(0);

        public CustomTopicRouter(List<String> list) {
            this.topics = list;
        }

        public TopicPartition route(String str, String str2, List<TopicPartition> list, PulsarSinkContext pulsarSinkContext) {
            return new TopicPartition(this.topics.get((this.counter.incrementAndGet() / 10) % this.topics.size()));
        }

        public /* bridge */ /* synthetic */ TopicPartition route(Object obj, String str, List list, PulsarSinkContext pulsarSinkContext) {
            return route((String) obj, str, (List<TopicPartition>) list, pulsarSinkContext);
        }
    }

    public PulsarSinkTestContext(PulsarTestEnvironment pulsarTestEnvironment) {
        super(pulsarTestEnvironment, Schema.STRING);
        this.topics = generateTopics();
        this.closer = Closer.create();
    }

    public Sink<String> createSink(TestingSinkSettings testingSinkSettings) {
        if (creatTopic()) {
            Iterator<String> it = this.topics.iterator();
            while (it.hasNext()) {
                try {
                    this.operator.createTopic(it.next(), 4);
                } catch (Exception e) {
                    throw new FlinkRuntimeException(e);
                }
            }
        }
        PulsarSinkBuilder<String> config = PulsarSink.builder().setServiceUrl(this.operator.serviceUrl()).setDeliveryGuarantee(PulsarTestCommonUtils.toDeliveryGuarantee(testingSinkSettings.getCheckpointingMode())).setSerializationSchema(this.schema).enableSchemaEvolution().setConfig(PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES, 4);
        if (creatTopic()) {
            config.setTopics(this.topics).setTopicRoutingMode(TopicRoutingMode.ROUND_ROBIN);
        } else {
            config.setTopicRouter(new CustomTopicRouter(this.topics));
        }
        setSinkBuilder(config);
        return config.build();
    }

    public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings testingSinkSettings) {
        PulsarPartitionDataReader<String> createSinkDataReader = createSinkDataReader(this.topics);
        this.closer.register(createSinkDataReader);
        return createSinkDataReader;
    }

    public List<String> generateTestData(TestingSinkSettings testingSinkSettings, long j) {
        Random random = new Random(j);
        int nextInt = random.nextInt(200) + RECORD_SIZE_LOWER_BOUND;
        ArrayList arrayList = new ArrayList(nextInt);
        for (int i = 0; i < nextInt; i++) {
            arrayList.add("index:" + i + "-data:" + RandomStringUtils.randomAlphanumeric(random.nextInt(20) + 20));
        }
        return arrayList;
    }

    public TypeInformation<String> getProducedType() {
        return Types.STRING;
    }

    @Override // org.apache.flink.connector.pulsar.testutils.PulsarTestContext
    public void close() throws Exception {
        this.topics = generateTopics();
        this.closer.close();
    }

    protected List<String> generateTopics() {
        return Collections.singletonList(TOPIC_NAME_PREFIX + RandomStringUtils.randomAlphanumeric(8));
    }

    protected void setSinkBuilder(PulsarSinkBuilder<String> pulsarSinkBuilder) {
    }

    protected boolean creatTopic() {
        return true;
    }

    protected PulsarPartitionDataReader<String> createSinkDataReader(List<String> list) {
        return new PulsarPartitionDataReader<>(this.operator, list, Schema.STRING);
    }
}
