/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.testutils.source;

import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connector.pulsar.testutils.source.PulsarPartitionDataWriter;
import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;

public abstract class PulsarSourceTestContext
extends PulsarTestContext<String>
implements DataStreamSourceExternalContext<String> {
    private static final long DISCOVERY_INTERVAL = 1000L;
    private static final int BATCH_DATA_SIZE = 300;

    protected PulsarSourceTestContext(PulsarTestEnvironment environment) {
        super(environment, Schema.STRING);
    }

    public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
        PulsarSourceBuilder builder = PulsarSource.builder().setDeserializationSchema(PulsarDeserializationSchema.pulsarSchema((Schema)this.schema)).setServiceUrl(this.operator.serviceUrl()).setAdminUrl(this.operator.adminUrl()).setTopicPattern(this.topicPattern(), RegexSubscriptionMode.AllTopics).setSubscriptionType(this.subscriptionType()).setSubscriptionName(this.subscriptionName()).setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, (Object)1000L);
        this.setSourceBuilder((PulsarSourceBuilder<String>)builder);
        if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) {
            builder.setBoundedStopCursor(StopCursor.latest());
        }
        return builder.build();
    }

    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(TestingSourceSettings sourceSettings) {
        String partitionName = this.generatePartitionName();
        return new PulsarPartitionDataWriter<String>(this.operator, partitionName, this.schema);
    }

    public List<String> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed) {
        Random random = new Random(seed);
        return IntStream.range(0, 300).boxed().map(index -> {
            int length = random.nextInt(20) + 1;
            return "split:" + splitIndex + "-index:" + index + "-content:" + RandomStringUtils.randomAlphanumeric((int)length);
        }).collect(Collectors.toList());
    }

    public TypeInformation<String> getProducedType() {
        return Types.STRING;
    }

    protected void setSourceBuilder(PulsarSourceBuilder<String> builder) {
    }

    protected abstract String topicPattern();

    protected abstract String subscriptionName();

    protected abstract SubscriptionType subscriptionType();

    protected abstract String generatePartitionName();
}

