package org.apache.eventmesh.storage.pravega.client;

import io.cloudevents.CloudEvent;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.impl.ByteArraySerializer;
import io.pravega.shared.NameUtils;
import io.pravega.shared.security.auth.DefaultCredentials;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.storage.pravega.config.PravegaStorageConfig;
import org.apache.eventmesh.storage.pravega.exception.PravegaStorageException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/storage/pravega/client/PravegaClient.class */
public class PravegaClient {
    private static final Logger log = LoggerFactory.getLogger(PravegaClient.class);
    private final PravegaStorageConfig config;
    private final StreamManager streamManager;
    private final EventStreamClientFactory clientFactory;
    private final ReaderGroupManager readerGroupManager;
    private final Map<String, EventStreamWriter<byte[]>> writerMap = new ConcurrentHashMap();
    private final Map<String, SubscribeTask> subscribeTaskMap = new ConcurrentHashMap();
    private static PravegaClient instance;

    public static PravegaClient getInstance() {
        return instance;
    }

    public static PravegaClient getInstance(PravegaStorageConfig pravegaStorageConfig) {
        if (instance == null) {
            instance = new PravegaClient(pravegaStorageConfig);
        }
        return instance;
    }

    private PravegaClient(PravegaStorageConfig pravegaStorageConfig) {
        this.config = pravegaStorageConfig;
        this.streamManager = StreamManager.create(pravegaStorageConfig.getControllerURI());
        ClientConfig.ClientConfigBuilder controllerURI = ClientConfig.builder().controllerURI(pravegaStorageConfig.getControllerURI());
        if (pravegaStorageConfig.isAuthEnabled()) {
            controllerURI.credentials(new DefaultCredentials(pravegaStorageConfig.getPassword(), pravegaStorageConfig.getUsername()));
        }
        if (pravegaStorageConfig.isTlsEnable()) {
            controllerURI.trustStore(pravegaStorageConfig.getTruststore()).validateHostName(false);
        }
        ClientConfig build = controllerURI.build();
        this.clientFactory = EventStreamClientFactory.withScope(pravegaStorageConfig.getScope(), build);
        this.readerGroupManager = ReaderGroupManager.withScope(pravegaStorageConfig.getScope(), build);
    }

    protected static PravegaClient getNewInstance(PravegaStorageConfig pravegaStorageConfig) {
        return new PravegaClient(pravegaStorageConfig);
    }

    public void start() {
        if (createScope()) {
            log.info("Create Pravega scope[{}] success.", this.config.getScope());
        } else {
            log.info("Pravega scope[{}] has already been created.", this.config.getScope());
        }
    }

    public void shutdown() {
        this.subscribeTaskMap.forEach((str, subscribeTask) -> {
            subscribeTask.stopRead();
        });
        this.subscribeTaskMap.clear();
        this.writerMap.forEach((str2, eventStreamWriter) -> {
            eventStreamWriter.close();
        });
        this.writerMap.clear();
        this.readerGroupManager.close();
        this.clientFactory.close();
        this.streamManager.close();
    }

    public SendResult publish(String str, CloudEvent cloudEvent) {
        if (!createStream(str)) {
            log.debug("stream[{}] has already been created.", str);
        }
        try {
            this.writerMap.computeIfAbsent(str, str2 -> {
                return createWrite(str);
            }).writeEvent(PravegaEvent.toByteArray((PravegaEvent) new PravegaCloudEventWriter(str).writeBinary(cloudEvent))).get(5L, TimeUnit.SECONDS);
            SendResult sendResult = new SendResult();
            sendResult.setTopic(str);
            sendResult.setMessageId("-1");
            return sendResult;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error(String.format("Write topic[%s] fail.", str), e);
            throw new PravegaStorageException(String.format("Write topic[%s] fail.", str));
        }
    }

    public boolean subscribe(String str, boolean z, String str2, String str3, EventListener eventListener) {
        if (this.subscribeTaskMap.containsKey(str)) {
            return true;
        }
        String buildReaderGroupName = buildReaderGroupName(z, str2, str);
        createReaderGroup(str, buildReaderGroupName);
        SubscribeTask subscribeTask = new SubscribeTask(str, createReader(buildReaderId(str3), buildReaderGroupName), eventListener);
        subscribeTask.start();
        this.subscribeTaskMap.put(str, subscribeTask);
        return true;
    }

    public boolean unsubscribe(String str, boolean z, String str2) {
        if (!this.subscribeTaskMap.containsKey(str)) {
            return true;
        }
        if (!z) {
            deleteReaderGroup(buildReaderGroupName(false, str2, str));
        }
        this.subscribeTaskMap.remove(str).stopRead();
        return true;
    }

    public boolean checkTopicExist(String str) {
        return this.streamManager.checkStreamExists(this.config.getScope(), str);
    }

    private boolean createScope() {
        return this.streamManager.createScope(this.config.getScope());
    }

    private boolean createStream(String str) {
        return this.streamManager.createStream(this.config.getScope(), str, StreamConfiguration.builder().build());
    }

    private EventStreamWriter<byte[]> createWrite(String str) {
        return this.clientFactory.createEventWriter(str, new ByteArraySerializer(), EventWriterConfig.builder().build());
    }

    private String buildReaderGroupName(boolean z, String str, String str2) {
        return z ? UUID.randomUUID().toString() : String.format("%s-%s", str, str2);
    }

    private String buildReaderId(String str) {
        return String.format("%s-reader", str).replace("\\(", "-").replace("\\)", "-");
    }

    private void createReaderGroup(String str, String str2) {
        if (!checkTopicExist(str)) {
            createStream(str);
        }
        this.readerGroupManager.createReaderGroup(str2, ReaderGroupConfig.builder().stream(NameUtils.getScopedStreamName(this.config.getScope(), str)).retentionType(ReaderGroupConfig.StreamDataRetention.AUTOMATIC_RELEASE_AT_LAST_CHECKPOINT).build());
    }

    private void deleteReaderGroup(String str) {
        this.readerGroupManager.deleteReaderGroup(str);
    }

    private EventStreamReader<byte[]> createReader(String str, String str2) {
        return this.clientFactory.createReader(str, str2, new ByteArraySerializer(), ReaderConfig.builder().build());
    }

    protected StreamManager getStreamManager() {
        return this.streamManager;
    }

    protected EventStreamClientFactory getClientFactory() {
        return this.clientFactory;
    }

    protected ReaderGroupManager getReaderGroupManager() {
        return this.readerGroupManager;
    }

    public Map<String, EventStreamWriter<byte[]>> getWriterMap() {
        return this.writerMap;
    }

    protected Map<String, SubscribeTask> getSubscribeTaskMap() {
        return this.subscribeTaskMap;
    }
}
