package com.redhat.insights.kafka.connect.transforms;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.redhat.insights.kafka.connect.transforms.KeyOrValueTransformation;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/redhat/insights/kafka/connect/transforms/InjectSchema.class */
public abstract class InjectSchema<T extends ConnectRecord<T>> extends AbstractTransformation<T> implements KeyOrValueTransformation<T> {
    private static final Logger LOG = LoggerFactory.getLogger(InjectSchema.class);
    private static final String CONFIG_FIELD = "schema";
    private volatile Schema schema;

    /* loaded from: input_file:com/redhat/insights/kafka/connect/transforms/InjectSchema$Key.class */
    public static class Key<T extends ConnectRecord<T>> extends InjectSchema<T> implements KeyOrValueTransformation.Key<T> {
        @Override // com.redhat.insights.kafka.connect.transforms.InjectSchema, com.redhat.insights.kafka.connect.transforms.AbstractTransformation
        public /* bridge */ /* synthetic */ void close() {
            super.close();
        }

        @Override // com.redhat.insights.kafka.connect.transforms.InjectSchema, com.redhat.insights.kafka.connect.transforms.AbstractTransformation
        public /* bridge */ /* synthetic */ ConfigDef config() {
            return super.config();
        }

        @Override // com.redhat.insights.kafka.connect.transforms.InjectSchema, com.redhat.insights.kafka.connect.transforms.AbstractTransformation
        public /* bridge */ /* synthetic */ void configure(Map map) {
            super.configure(map);
        }
    }

    /* loaded from: input_file:com/redhat/insights/kafka/connect/transforms/InjectSchema$Value.class */
    public static class Value<T extends ConnectRecord<T>> extends InjectSchema<T> implements KeyOrValueTransformation.Value<T> {
        @Override // com.redhat.insights.kafka.connect.transforms.InjectSchema, com.redhat.insights.kafka.connect.transforms.AbstractTransformation
        public /* bridge */ /* synthetic */ void close() {
            super.close();
        }

        @Override // com.redhat.insights.kafka.connect.transforms.InjectSchema, com.redhat.insights.kafka.connect.transforms.AbstractTransformation
        public /* bridge */ /* synthetic */ ConfigDef config() {
            return super.config();
        }

        @Override // com.redhat.insights.kafka.connect.transforms.InjectSchema, com.redhat.insights.kafka.connect.transforms.AbstractTransformation
        public /* bridge */ /* synthetic */ void configure(Map map) {
            super.configure(map);
        }
    }

    public InjectSchema() {
        super(new ConfigDef().define(CONFIG_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Schema to inject. The schema is expected to be in JSON Schema format."));
    }

    @Override // com.redhat.insights.kafka.connect.transforms.AbstractTransformation
    public void configure(Map<String, ?> map, AbstractConfig abstractConfig) {
        String string = abstractConfig.getString(CONFIG_FIELD);
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            JsonConverter jsonConverter = new JsonConverter();
            try {
                JsonNode readTree = objectMapper.readTree(string);
                jsonConverter.configure(Collections.singletonMap("converter.type", "value"));
                this.schema = jsonConverter.asConnectSchema(readTree);
                LOG.info("Using schema {}", this.schema.toString());
                jsonConverter.close();
            } finally {
            }
        } catch (IOException e) {
            throw new ConnectException(e);
        }
    }

    private Object convertTopLevelObjectToMatchSchema(Schema schema, Object obj) {
        return ((obj instanceof Map) && schema.type() == Schema.Type.STRUCT) ? Utils.mapToStruct((Map) Utils.cast(obj), schema) : obj;
    }

    public T apply(T t) {
        LOG.debug("Injecting schema to record {}", t);
        return newRecord(t, convertTopLevelObjectToMatchSchema(this.schema, getObject(t)), this.schema);
    }

    @Override // com.redhat.insights.kafka.connect.transforms.AbstractTransformation
    public /* bridge */ /* synthetic */ void close() {
        super.close();
    }

    @Override // com.redhat.insights.kafka.connect.transforms.AbstractTransformation
    public /* bridge */ /* synthetic */ ConfigDef config() {
        return super.config();
    }

    @Override // com.redhat.insights.kafka.connect.transforms.AbstractTransformation
    public /* bridge */ /* synthetic */ void configure(Map map) {
        super.configure(map);
    }
}
