/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.testutils.sink;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.sink.PulsarSink;
import org.apache.flink.connector.pulsar.sink.PulsarSinkOptions;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
import org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils;
import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connector.pulsar.testutils.sink.PulsarPartitionDataReader;
import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
import org.apache.pulsar.client.api.Schema;

public class PulsarSinkTestContext
extends PulsarTestContext<String>
implements DataStreamSinkV2ExternalContext<String> {
    private static final String TOPIC_NAME_PREFIX = "flink-sink-topic-";
    private static final int RECORD_SIZE_UPPER_BOUND = 300;
    private static final int RECORD_SIZE_LOWER_BOUND = 100;
    private static final int RECORD_STRING_SIZE = 20;
    private String topicName = this.topicName();
    private final Closer closer = Closer.create();

    public PulsarSinkTestContext(PulsarTestEnvironment environment) {
        super(environment, Schema.STRING);
    }

    @Override
    protected String displayName() {
        return "write messages into one topic in Pulsar";
    }

    public Sink<String> createSink(TestingSinkSettings sinkSettings) {
        if (!this.operator.topicExists(this.topicName)) {
            this.operator.createTopic(this.topicName, 4);
        }
        DeliveryGuarantee guarantee = PulsarTestCommonUtils.toDeliveryGuarantee(sinkSettings.getCheckpointingMode());
        return PulsarSink.builder().setServiceUrl(this.operator.serviceUrl()).setAdminUrl(this.operator.adminUrl()).setTopics(new String[]{this.topicName}).setDeliveryGuarantee(guarantee).setSerializationSchema(PulsarSerializationSchema.pulsarSchema((Schema)Schema.STRING)).enableSchemaEvolution().setConfig(PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES, (Object)4).build();
    }

    public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
        PulsarPartitionDataReader reader = (PulsarPartitionDataReader)PulsarExceptionUtils.sneakyClient(() -> new PulsarPartitionDataReader(this.operator, this.topicName, Schema.STRING));
        this.closer.register((Closeable)reader);
        return reader;
    }

    public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
        Random random = new Random(seed);
        int recordSize = random.nextInt(200) + 100;
        ArrayList<String> records = new ArrayList<String>(recordSize);
        for (int i = 0; i < recordSize; ++i) {
            int size = random.nextInt(20) + 20;
            String record = "index:" + i + "-data:" + RandomStringUtils.randomAlphanumeric((int)size);
            records.add(record);
        }
        return records;
    }

    public TypeInformation<String> getProducedType() {
        return Types.STRING;
    }

    @Override
    public void close() throws Exception {
        this.closer.register(() -> {
            this.topicName = this.topicName();
        });
        this.closer.close();
    }

    private String topicName() {
        return TOPIC_NAME_PREFIX + RandomStringUtils.randomAlphanumeric((int)8);
    }
}

