package io.confluent.connect.rest.datapreview.extension.service;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.connect.rest.datapreview.extension.entities.ConnectorDataPreviewRequest;
import io.confluent.connect.rest.datapreview.extension.util.CreateConnectorRequestTranslator;
import io.confluent.connect.rest.datapreview.extension.util.PreviewRecordTransformer;
import java.util.Map;
import java.util.stream.Collectors;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.transforms.Filter;
import org.apache.kafka.connect.transforms.predicates.TopicNameMatches;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/rest/datapreview/extension/service/ConnectorDataPreviewService.class */
public class ConnectorDataPreviewService {
    private static final Logger log = LoggerFactory.getLogger(ConnectorDataPreviewService.class);
    public static final String DATA_PREVIEW_ID_PROPERTY = "datapreview.id";
    private static final String DATA_PREVIEW_PREFIX = "data-preview";
    private static final String DELIMITER = "-";
    private static final String DATA_PREVIEW_INTERNAL_PREFIX = "data-preview-internal-";
    private static final String DATA_FILTER_TRANSFORM_NAME = "data-preview-internal-dataFilter";
    private static final String DATA_FILTER_PREDICATE_NAME = "data-preview-internal-dataTopicMatcher";
    private static final String SCHEMA_ENABLE_SUFFIX = ".schemas.enable";
    private final ConnectorsResource connectorsResource;
    private final Map<String, String> workerConfigs;
    private final CreateConnectorRequestTranslator createConnectorRequestTranslator;
    private final Plugins plugins;

    public ConnectorDataPreviewService(ConnectorsResource connectorsResource, Map<String, Object> map) {
        this.connectorsResource = connectorsResource;
        this.workerConfigs = (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return (String) entry.getValue();
        }));
        this.createConnectorRequestTranslator = new CreateConnectorRequestTranslator();
        this.plugins = new Plugins(this.workerConfigs);
    }

    @VisibleForTesting
    ConnectorDataPreviewService(ConnectorsResource connectorsResource, Map<String, String> map, CreateConnectorRequestTranslator createConnectorRequestTranslator, Plugins plugins) {
        this.connectorsResource = connectorsResource;
        this.workerConfigs = map;
        this.createConnectorRequestTranslator = createConnectorRequestTranslator;
        this.plugins = plugins;
    }

    public Response createDataPreview(ConnectorDataPreviewRequest connectorDataPreviewRequest, HttpHeaders httpHeaders) throws Throwable {
        String generatePreviewId = generatePreviewId(connectorDataPreviewRequest);
        String generateTraceTopicName = generateTraceTopicName(connectorDataPreviewRequest, generatePreviewId);
        validateTaskLimits(connectorDataPreviewRequest.config());
        connectorDataPreviewRequest.config().putIfAbsent(DATA_PREVIEW_ID_PROPERTY, generatePreviewId);
        applyFilterTransforms(connectorDataPreviewRequest);
        addTraceConfigs(connectorDataPreviewRequest, generateTraceTopicName);
        CreateConnectorRequest apply = this.createConnectorRequestTranslator.apply(connectorDataPreviewRequest);
        return this.connectorsResource.putConnectorConfig(apply.name(), httpHeaders, (Boolean) null, apply.config());
    }

    @VisibleForTesting
    void addTraceConfigs(ConnectorDataPreviewRequest connectorDataPreviewRequest, String str) {
        connectorDataPreviewRequest.config().put("trace.records.enable", "true");
        connectorDataPreviewRequest.config().putIfAbsent("trace.records.topic", str);
        connectorDataPreviewRequest.config().put("trace.records.value.converter", JsonConverter.class.getName());
        connectorDataPreviewRequest.config().put("trace.records.value.converter.schemas.enable", "false");
        connectorDataPreviewRequest.config().put("trace.records.key.converter", JsonConverter.class.getName());
        connectorDataPreviewRequest.config().put("trace.records.key.converter.schemas.enable", "false");
        if (getConnectorType(connectorDataPreviewRequest.config().get("connector.class")) == ConnectorType.SOURCE) {
            connectorDataPreviewRequest.config().put("trace.records.transforms", "removeDataFilterMessages");
            connectorDataPreviewRequest.config().put("trace.records.transforms.removeDataFilterMessages.type", PreviewRecordTransformer.class.getName());
            connectorDataPreviewRequest.config().put("trace.records.transforms.removeDataFilterMessages.expected.total.steps", Integer.toString(new ConnectorConfig(this.plugins, connectorDataPreviewRequest.config()).transformations().size() + 1));
        }
    }

    String generateTraceTopicName(ConnectorDataPreviewRequest connectorDataPreviewRequest, String str) {
        String str2 = connectorDataPreviewRequest.config().get("trace.records.topic");
        return null == str2 ? str : str2;
    }

    @VisibleForTesting
    String generatePreviewId(ConnectorDataPreviewRequest connectorDataPreviewRequest) {
        String orDefault = connectorDataPreviewRequest.config().getOrDefault(DATA_PREVIEW_ID_PROPERTY, "");
        if (orDefault.isEmpty()) {
            orDefault = String.join(DELIMITER, DATA_PREVIEW_PREFIX, connectorDataPreviewRequest.config().getOrDefault("name", ""));
        }
        return orDefault;
    }

    @VisibleForTesting
    public ConnectorType getConnectorType(String str) {
        try {
            return ConnectorType.from(this.plugins.connectorClass(str));
        } catch (Exception e) {
            log.error("Failed to find any class that implements Connector and which name matches", e);
            throw new ConnectException("Failed to find any class that implements Connector and which name matches " + str);
        }
    }

    @VisibleForTesting
    public void applyFilterTransforms(ConnectorDataPreviewRequest connectorDataPreviewRequest) {
        if (getConnectorType(connectorDataPreviewRequest.config().get("connector.class")) == ConnectorType.SINK) {
            return;
        }
        String orDefault = connectorDataPreviewRequest.config().getOrDefault("transforms", "");
        connectorDataPreviewRequest.config().put("transforms", orDefault.isEmpty() ? DATA_FILTER_TRANSFORM_NAME : orDefault + "," + DATA_FILTER_TRANSFORM_NAME);
        connectorDataPreviewRequest.config().putIfAbsent("transforms.data-preview-internal-dataFilter.type", Filter.class.getName());
        connectorDataPreviewRequest.config().putIfAbsent("transforms.data-preview-internal-dataFilter.predicates", DATA_FILTER_PREDICATE_NAME);
        connectorDataPreviewRequest.config().putIfAbsent("predicates", DATA_FILTER_PREDICATE_NAME);
        connectorDataPreviewRequest.config().putIfAbsent("predicates.data-preview-internal-dataTopicMatcher.type", TopicNameMatches.class.getName());
        connectorDataPreviewRequest.config().putIfAbsent("predicates.data-preview-internal-dataTopicMatcher.pattern", ".*");
    }

    public void validateTaskLimits(Map<String, String> map) {
        if (Integer.parseInt(map.getOrDefault("tasks.max", String.valueOf(1))) > Integer.parseInt(this.workerConfigs.getOrDefault(ConnectorDataPreviewConfig.DATA_PREVIEW_MAX_TASKS_CONFIG, String.valueOf(ConnectorDataPreviewConfig.DATA_PREVIEW_MAX_TASKS_DEFAULT)))) {
            throw new BadRequestException(String.format("The value of %s should not be greater than %s", "tasks.max", ConnectorDataPreviewConfig.DATA_PREVIEW_MAX_TASKS_CONFIG));
        }
    }
}
