package org.apache.flink.connector.pulsar.testutils.sink;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.sink.PulsarSink;
import org.apache.flink.connector.pulsar.sink.PulsarSinkOptions;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
import org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils;
import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
import org.apache.pulsar.client.api.Schema;

/* loaded from: input_file:org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.class */
public class PulsarSinkTestContext extends PulsarTestContext<String> implements DataStreamSinkV2ExternalContext<String> {
    private static final String TOPIC_NAME_PREFIX = "flink-sink-topic-";
    private static final int RECORD_SIZE_UPPER_BOUND = 300;
    private static final int RECORD_SIZE_LOWER_BOUND = 100;
    private static final int RECORD_STRING_SIZE = 20;
    private String topicName;
    private final Closer closer;

    public PulsarSinkTestContext(PulsarTestEnvironment pulsarTestEnvironment) {
        super(pulsarTestEnvironment, Schema.STRING);
        this.topicName = topicName();
        this.closer = Closer.create();
    }

    @Override // org.apache.flink.connector.pulsar.testutils.PulsarTestContext
    protected String displayName() {
        return "write messages into one topic in Pulsar";
    }

    public Sink<String> createSink(TestingSinkSettings testingSinkSettings) {
        this.operator.createTopic(this.topicName, 4);
        return PulsarSink.builder().setServiceUrl(this.operator.serviceUrl()).setAdminUrl(this.operator.adminUrl()).setTopics(new String[]{this.topicName}).setDeliveryGuarantee(PulsarTestCommonUtils.toDeliveryGuarantee(testingSinkSettings.getCheckpointingMode())).setSerializationSchema(PulsarSerializationSchema.pulsarSchema(this.schema)).enableSchemaEvolution().setConfig(PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES, 4).build();
    }

    public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings testingSinkSettings) {
        PulsarPartitionDataReader pulsarPartitionDataReader = (PulsarPartitionDataReader) PulsarExceptionUtils.sneakyClient(() -> {
            return new PulsarPartitionDataReader(this.operator, this.topicName, Schema.STRING);
        });
        this.closer.register(pulsarPartitionDataReader);
        return pulsarPartitionDataReader;
    }

    public List<String> generateTestData(TestingSinkSettings testingSinkSettings, long j) {
        Random random = new Random(j);
        int nextInt = random.nextInt(200) + RECORD_SIZE_LOWER_BOUND;
        ArrayList arrayList = new ArrayList(nextInt);
        for (int i = 0; i < nextInt; i++) {
            arrayList.add("index:" + i + "-data:" + RandomStringUtils.randomAlphanumeric(random.nextInt(20) + 20));
        }
        return arrayList;
    }

    public TypeInformation<String> getProducedType() {
        return Types.STRING;
    }

    @Override // org.apache.flink.connector.pulsar.testutils.PulsarTestContext
    public void close() throws Exception {
        this.closer.register(() -> {
            this.topicName = topicName();
        });
        this.closer.close();
    }

    private String topicName() {
        return TOPIC_NAME_PREFIX + RandomStringUtils.randomAlphanumeric(8);
    }
}
