package org.apache.flink.connector.pulsar.testutils.source;

import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;

/* loaded from: input_file:org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.class */
public abstract class PulsarSourceTestContext extends PulsarTestContext<String> implements DataStreamSourceExternalContext<String> {
    private static final long DISCOVERY_INTERVAL = 1000;
    private static final int BATCH_DATA_SIZE = 300;

    /* JADX INFO: Access modifiers changed from: protected */
    public PulsarSourceTestContext(PulsarTestEnvironment pulsarTestEnvironment) {
        super(pulsarTestEnvironment, Schema.STRING);
    }

    public Source<String, ?, ?> createSource(TestingSourceSettings testingSourceSettings) {
        PulsarSourceBuilder<String> config = PulsarSource.builder().setDeserializationSchema(PulsarDeserializationSchema.pulsarSchema(this.schema)).setServiceUrl(this.operator.serviceUrl()).setAdminUrl(this.operator.adminUrl()).setTopicPattern(topicPattern(), RegexSubscriptionMode.AllTopics).setSubscriptionType(subscriptionType()).setSubscriptionName(subscriptionName()).setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, Long.valueOf(DISCOVERY_INTERVAL));
        setSourceBuilder(config);
        if (testingSourceSettings.getBoundedness() == Boundedness.BOUNDED) {
            config.setBoundedStopCursor(StopCursor.latest());
        }
        return config.build();
    }

    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(TestingSourceSettings testingSourceSettings) {
        return new PulsarPartitionDataWriter(this.operator, generatePartitionName(), this.schema);
    }

    public List<String> generateTestData(TestingSourceSettings testingSourceSettings, int i, long j) {
        Random random = new Random(j);
        return (List) IntStream.range(0, BATCH_DATA_SIZE).boxed().map(num -> {
            return "split:" + i + "-index:" + num + "-content:" + RandomStringUtils.randomAlphanumeric(random.nextInt(20) + 1);
        }).collect(Collectors.toList());
    }

    public TypeInformation<String> getProducedType() {
        return Types.STRING;
    }

    protected void setSourceBuilder(PulsarSourceBuilder<String> pulsarSourceBuilder) {
    }

    protected abstract String topicPattern();

    protected abstract String subscriptionName();

    protected abstract SubscriptionType subscriptionType();

    protected abstract String generatePartitionName();
}
