package io.confluent.connect.security;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.confluent.connect.avro.AvroConverter;
import io.confluent.connect.json.JsonSchemaConverter;
import io.confluent.connect.protobuf.ProtobufConverter;
import io.confluent.connect.security.config.manipulation.RbacBasicCredentialsManipulator;
import io.confluent.connect.security.rbac.ConnectActions;
import io.confluent.connect.security.rbac.ConnectorOperations;
import io.confluent.connect.security.util.ConnectRestApiMethods;
import io.confluent.connect.security.util.ConnectRestUtils;
import io.confluent.rest.entities.ErrorMessage;
import io.confluent.security.authorizer.Action;
import io.confluent.security.authorizer.Operation;
import io.confluent.security.authorizer.Scope;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.container.ResourceInfo;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.storage.Converter;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:io/confluent/connect/security/ConnectSecurityFilterTest.class */
public class ConnectSecurityFilterTest {
    protected static final KafkaPrincipal PRINCIPAL = new KafkaPrincipal("User", "prince");
    protected static final Scope SCOPE = new Scope.Builder(new String[0]).withKafkaCluster("clstrfck").withCluster("connect-cluster", "idfk").build();
    protected static final Map<String, Object> CONNECTORS = ImmutableMap.of("connectorOne", ImmutableMap.of("info", 1, "status", 2), "connectorTwo", ImmutableMap.of("info", "blah", "status", 2), "connectorRed", ImmutableMap.of("info", 1), "connectorBlue", ImmutableMap.of("status", 5));
    protected static final Collection<String> CONNECTOR_NAMES = CONNECTORS.keySet();
    protected static final Map<String, Object> VISIBLE_CONNECTORS = ImmutableMap.of("connectorOne", ImmutableMap.of("info", 1), "connectorTwo", ImmutableMap.of("status", 1));
    protected static final Collection<String> VISIBLE_CONNECTOR_NAMES = VISIBLE_CONNECTORS.keySet();
    protected static final Map<String, String> CONNECTOR_CONFIG = ImmutableMap.of("connector.class", "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "topics", "us-west-1-payments", "tasks.max", "69", "connection.url", "https://eshost3:9200");
    protected static final String MDS_URL = "https://localhost:6969";
    protected static final Map<String, String> WORKER_CONFIG = ImmutableMap.of("confluent.metadata.bootstrap.server.urls", MDS_URL, "key.converter", "Json", "value.converter", "Json");
    protected static Method DUMMY_METHOD;
    public static final Collection<Class<? extends Converter>> SCHEMA_REGISTRY_CONVERTERS;

    @Mock
    protected ConnectActions connectActions;

    @Mock
    protected ResourceInfo resourceInfo;

    @Mock
    protected ContainerRequestContext requestContext;

    @Mock
    protected ContainerResponseContext responseContext;
    protected MockAuthorizer authorizer;
    protected ConnectSecurityFilter connectSecurityFilter;

    @Captor
    private ArgumentCaptor<Collection<String>> connectorsCaptor;

    @Captor
    private ArgumentCaptor<Map<String, Map<String, Object>>> connectorsMapCaptor;

    @Captor
    private ArgumentCaptor<ConfigInfos> configInfosCaptor;

    @Captor
    private ArgumentCaptor<InputStream> requestBodyCaptor;

    @Captor
    private ArgumentCaptor<Response> responseCaptor;
    public static final String CONNECTOR_NAME = "idklol";
    public static final String USERNAME = "Mark Summer";
    public static final String PASSWORD = "turtleIsland4Lyfe!";

    @Before
    public void setup() {
        this.authorizer = new MockAuthorizer();
        VISIBLE_CONNECTOR_NAMES.forEach(str -> {
            this.authorizer.allowConnectorOperations(PRINCIPAL, str, ConnectorOperations.READ_STATUS);
        });
        this.authorizer.allowConnectorOperations(PRINCIPAL, "connectorTwo", ConnectorOperations.READ_CONFIG);
        SecurityContext securityContext = (SecurityContext) Mockito.mock(SecurityContext.class);
        Mockito.when(this.requestContext.getSecurityContext()).thenReturn(securityContext);
        Mockito.when(securityContext.getUserPrincipal()).thenReturn(PRINCIPAL);
        this.connectSecurityFilter = newFilter(WORKER_CONFIG);
    }

    protected ConnectSecurityFilter newFilter(Map<String, ?> map) {
        return new ConnectSecurityFilter(new ConnectSecurityExtensionConfig(map), SCOPE, this.connectActions, this.authorizer, this.resourceInfo);
    }

    protected Map<String, String> connectorConfig() {
        return new HashMap(CONNECTOR_CONFIG);
    }

    protected Map<String, String> workerConfig() {
        return new HashMap(WORKER_CONFIG);
    }

    @Test
    public void shouldInjectRbacConfigs() throws Exception {
        expectValidateConfigResponse(new ConfigInfos("com.github.c0urante.kafka.connect.reddit.RedditSourceConnector", 69, Collections.emptyList(), Collections.emptyList()));
        filterResponse();
        ((ContainerResponseContext) Mockito.verify(this.responseContext)).setEntity(this.configInfosCaptor.capture());
        Assert.assertEquals("Connector name should be unchanged after RBAC config injection", "com.github.c0urante.kafka.connect.reddit.RedditSourceConnector", ((ConfigInfos) this.configInfosCaptor.getValue()).name());
        Assert.assertEquals("Error count should be unchanged after RBAC config injection", 69L, ((ConfigInfos) this.configInfosCaptor.getValue()).errorCount());
        Assert.assertTrue("RBAC group should be injected into config validate response", ((ConfigInfos) this.configInfosCaptor.getValue()).groups().contains("RBAC"));
        Assert.assertTrue("RBAC username should be injected into config validate response", ((ConfigInfos) this.configInfosCaptor.getValue()).values().contains(RbacBasicCredentialsManipulator.RBAC_USERNAME_CONFIG_INFO));
        Assert.assertTrue("RBAC password should be injected into config validate response", ((ConfigInfos) this.configInfosCaptor.getValue()).values().contains(RbacBasicCredentialsManipulator.RBAC_PASSWORD_CONFIG_INFO));
        Assert.assertEquals("RBAC username config value should always be hidden", Optional.of("[hidden]"), ((ConfigInfos) this.configInfosCaptor.getValue()).values().stream().filter(configInfo -> {
            return configInfo.configKey().name().equals("principal.service.name");
        }).map(configInfo2 -> {
            return configInfo2.configValue().value();
        }).findFirst());
        Assert.assertEquals("RBAC password config value should always be hidden", Optional.of("[hidden]"), ((ConfigInfos) this.configInfosCaptor.getValue()).values().stream().filter(configInfo3 -> {
            return configInfo3.configKey().name().equals("principal.service.password");
        }).map(configInfo4 -> {
            return configInfo4.configValue().value();
        }).findFirst());
    }

    @Test
    public void shouldNotInjectRbacConfigsWhenDisabled() throws Exception {
        Map<String, String> workerConfig = workerConfig();
        workerConfig.put("enable.rbac.credential.injection", "false");
        this.connectSecurityFilter = newFilter(workerConfig);
        expectValidateConfigResponse(new ConfigInfos("com.github.c0urante.kafka.connect.reddit.RedditSourceConnector", 69, Collections.emptyList(), Collections.emptyList()));
        filterResponse();
        verifyResponseBodyWasNotAltered();
    }

    @Test
    public void shouldTranslateRbacConfigsForConnectorCreateNoSrConverter() throws Exception {
        Map<String, String> connectorConfig = connectorConfig();
        connectorConfig.put("principal.service.name", USERNAME);
        connectorConfig.put("principal.service.password", PASSWORD);
        expectCreateConnectorRequest(CONNECTOR_NAME, connectorConfig, CreateConnectorRequest.InitialState.STOPPED);
        filterRequest();
        ((ContainerRequestContext) Mockito.verify(this.requestContext, Mockito.atLeastOnce())).setEntityStream((InputStream) this.requestBodyCaptor.capture());
        CreateConnectorRequest createConnectorRequest = (CreateConnectorRequest) new ObjectMapper().readValue((InputStream) this.requestBodyCaptor.getValue(), CreateConnectorRequest.class);
        Assert.assertEquals("Connector name should not be modified during RBAC config translation", CONNECTOR_NAME, createConnectorRequest.name());
        Assert.assertEquals("Connector's initial state should be preserved during RBAC config translation", CreateConnectorRequest.InitialState.STOPPED, createConnectorRequest.initialState());
        verifyRbacSaslJaasConfigTranslation(createConnectorRequest.config());
        Assert.assertEquals("No non-RBAC configurations should be changed by RBAC config translation", stripAllRbacConfigs(false, connectorConfig), stripAllRbacConfigs(false, createConnectorRequest.config()));
    }

    @Test
    public void shouldTranslateRbacConfigsForConnectorCreateWithConnectorSpecificSrConverter() throws Exception {
        Iterator<Class<? extends Converter>> it = SCHEMA_REGISTRY_CONVERTERS.iterator();
        while (it.hasNext()) {
            testRbacConfigTranslationForConnectorCreateWithConnectorSpecificSrConverter(it.next());
        }
    }

    private void testRbacConfigTranslationForConnectorCreateWithConnectorSpecificSrConverter(Class<? extends Converter> cls) throws Exception {
        Map<String, String> connectorConfig = connectorConfig();
        connectorConfig.put("principal.service.name", USERNAME);
        connectorConfig.put("principal.service.password", PASSWORD);
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter", cls.getName());
        hashMap.put("key.converter.schema.registry.url", "https://localhost:4761");
        hashMap.put("key.converter.key.subject.name.strategy", "io.confluent.kafka.serializers.subject.RecordNameStrategy");
        connectorConfig.putAll(hashMap);
        expectCreateConnectorRequest(CONNECTOR_NAME, connectorConfig);
        filterRequest();
        ((ContainerRequestContext) Mockito.verify(this.requestContext, Mockito.atLeastOnce())).setEntityStream((InputStream) this.requestBodyCaptor.capture());
        CreateConnectorRequest createConnectorRequest = (CreateConnectorRequest) new ObjectMapper().readValue((InputStream) this.requestBodyCaptor.getValue(), CreateConnectorRequest.class);
        Assert.assertEquals("Connector name should not be modified during RBAC config translation", CONNECTOR_NAME, createConnectorRequest.name());
        verifyRbacSaslJaasConfigTranslation(createConnectorRequest.config());
        verifyRbacSrConverterConfigTranslation("key.converter", hashMap, createConnectorRequest.config());
        Assert.assertEquals("No non-RBAC configurations should be changed by RBAC config translation", stripAllRbacConfigs(true, connectorConfig), stripAllRbacConfigs(true, createConnectorRequest.config()));
    }

    @Test
    public void shouldTranslateRbacConfigsForConnectorCreateWithWorkerLevelSrConverter() throws Exception {
        Iterator<Class<? extends Converter>> it = SCHEMA_REGISTRY_CONVERTERS.iterator();
        while (it.hasNext()) {
            testRbacConfigTranslationForConnectorCreateWithWorkerLevelSrConverter(it.next());
        }
    }

    private void testRbacConfigTranslationForConnectorCreateWithWorkerLevelSrConverter(Class<? extends Converter> cls) throws Exception {
        Map<String, String> connectorConfig = connectorConfig();
        connectorConfig.put("principal.service.name", USERNAME);
        connectorConfig.put("principal.service.password", PASSWORD);
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter", cls.getSimpleName());
        hashMap.put("key.converter.schema.registry.url", "https://localhost:4761");
        hashMap.put("key.converter.key.subject.name.strategy", "io.confluent.kafka.serializers.subject.RecordNameStrategy");
        Map<String, String> workerConfig = workerConfig();
        workerConfig.putAll(hashMap);
        this.connectSecurityFilter = newFilter(workerConfig);
        expectCreateConnectorRequest(CONNECTOR_NAME, connectorConfig);
        filterRequest();
        ((ContainerRequestContext) Mockito.verify(this.requestContext, Mockito.atLeastOnce())).setEntityStream((InputStream) this.requestBodyCaptor.capture());
        CreateConnectorRequest createConnectorRequest = (CreateConnectorRequest) new ObjectMapper().readValue((InputStream) this.requestBodyCaptor.getValue(), CreateConnectorRequest.class);
        Assert.assertEquals("Connector name should not be modified during RBAC config translation", CONNECTOR_NAME, createConnectorRequest.name());
        verifyRbacSaslJaasConfigTranslation(createConnectorRequest.config());
        verifyRbacSrConverterConfigTranslation("key.converter", hashMap, createConnectorRequest.config());
        Assert.assertEquals("No non-RBAC configurations should be changed by RBAC config translation", stripAllRbacConfigs(true, connectorConfig), stripAllRbacConfigs(true, createConnectorRequest.config()));
    }

    @Test
    public void shouldTranslateRbacConfigsForSrConverterEvenWhenCredentialsAlreadySuppliedForConnector() throws Exception {
        Iterator<Class<? extends Converter>> it = SCHEMA_REGISTRY_CONVERTERS.iterator();
        while (it.hasNext()) {
            testRbacConfigTranslationForSrConverterEvenWhenCredentialsAlreadySuppliedForConnector(it.next());
        }
    }

    private void testRbacConfigTranslationForSrConverterEvenWhenCredentialsAlreadySuppliedForConnector(Class<? extends Converter> cls) throws Exception {
        Map<String, String> connectorConfig = connectorConfig();
        connectorConfig.put("principal.service.name", USERNAME);
        connectorConfig.put("principal.service.password", PASSWORD);
        connectorConfig.put("key.converter.basic.auth.credentials.source", "USER_INFO");
        connectorConfig.put("key.converter.basic.auth.user.info", "different:credentials");
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter", cls.getName());
        hashMap.put("key.converter.schema.registry.url", "https://localhost:4761");
        connectorConfig.putAll(hashMap);
        expectCreateConnectorRequest(CONNECTOR_NAME, connectorConfig);
        filterRequest();
        ((ContainerRequestContext) Mockito.verify(this.requestContext, Mockito.atLeastOnce())).setEntityStream((InputStream) this.requestBodyCaptor.capture());
        CreateConnectorRequest createConnectorRequest = (CreateConnectorRequest) new ObjectMapper().readValue((InputStream) this.requestBodyCaptor.getValue(), CreateConnectorRequest.class);
        Assert.assertEquals("Connector name should not be modified during RBAC config translation", CONNECTOR_NAME, createConnectorRequest.name());
        verifyRbacSaslJaasConfigTranslation(createConnectorRequest.config());
        verifyRbacSrConverterConfigTranslation("key.converter", hashMap, createConnectorRequest.config());
        Assert.assertEquals("No non-RBAC configurations should be changed by RBAC config translation", stripAllRbacConfigs(true, connectorConfig), stripAllRbacConfigs(true, createConnectorRequest.config()));
    }

    @Test
    public void shouldTranslateRbacConfigsForSrConverterEvenWhenCredentialsAlreadySuppliedForWorker() throws Exception {
        Iterator<Class<? extends Converter>> it = SCHEMA_REGISTRY_CONVERTERS.iterator();
        while (it.hasNext()) {
            testRbacConfigTranslationForSrConverterEvenWhenCredentialsAlreadySuppliedForWorker(it.next());
        }
    }

    private void testRbacConfigTranslationForSrConverterEvenWhenCredentialsAlreadySuppliedForWorker(Class<? extends Converter> cls) throws Exception {
        Map<String, String> connectorConfig = connectorConfig();
        connectorConfig.put("principal.service.name", USERNAME);
        connectorConfig.put("principal.service.password", PASSWORD);
        Map<String, String> workerConfig = workerConfig();
        workerConfig.put("key.converter.bearer.auth.credentials.source", "STATIC_TOKEN");
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter.bearer.auth.token", "this is a valid jwt token, right?");
        hashMap.put("key.converter", cls.getSimpleName());
        hashMap.put("key.converter.schema.registry.url", "https://localhost:4761");
        hashMap.put("key.converter.key.subject.name.strategy", "io.confluent.kafka.serializers.subject.RecordNameStrategy");
        workerConfig.putAll(hashMap);
        this.connectSecurityFilter = newFilter(workerConfig);
        expectCreateConnectorRequest(CONNECTOR_NAME, connectorConfig);
        filterRequest();
        ((ContainerRequestContext) Mockito.verify(this.requestContext, Mockito.atLeastOnce())).setEntityStream((InputStream) this.requestBodyCaptor.capture());
        CreateConnectorRequest createConnectorRequest = (CreateConnectorRequest) new ObjectMapper().readValue((InputStream) this.requestBodyCaptor.getValue(), CreateConnectorRequest.class);
        Assert.assertEquals("Connector name should not be modified during RBAC config translation", CONNECTOR_NAME, createConnectorRequest.name());
        verifyRbacSaslJaasConfigTranslation(createConnectorRequest.config());
        verifyRbacSrConverterConfigTranslation("key.converter", hashMap, createConnectorRequest.config());
        Assert.assertEquals("No non-RBAC configurations should be changed by RBAC config translation", stripAllRbacConfigs(true, connectorConfig), stripAllRbacConfigs(true, createConnectorRequest.config()));
    }

    @Test
    public void shouldNotTranslateRbacConfigsForConnectorCreateWhenDisabled() throws Exception {
        Map<String, String> workerConfig = workerConfig();
        workerConfig.put("enable.rbac.credential.injection", "false");
        this.connectSecurityFilter = newFilter(workerConfig);
        Map<String, String> connectorConfig = connectorConfig();
        connectorConfig.put("principal.service.name", USERNAME);
        connectorConfig.put("principal.service.password", PASSWORD);
        expectCreateConnectorRequest(CONNECTOR_NAME, connectorConfig);
        filterRequest();
        verifyResponseBodyWasNotAltered();
    }

    @Test
    public void shouldTranslateRbacConfigsForPutConnectorConfigWithNoSrConverter() throws Exception {
        Map<String, String> connectorConfig = connectorConfig();
        connectorConfig.put("principal.service.name", USERNAME);
        connectorConfig.put("principal.service.password", PASSWORD);
        expectPutConnectorConfigRequest(connectorConfig);
        filterRequest();
        ((ContainerRequestContext) Mockito.verify(this.requestContext, Mockito.atLeastOnce())).setEntityStream((InputStream) this.requestBodyCaptor.capture());
        Map<String, String> map = (Map) new ObjectMapper().readValue((InputStream) this.requestBodyCaptor.getValue(), ConnectRestUtils.CONNECTOR_CONFIG_TYPE);
        verifyRbacSaslJaasConfigTranslation(map);
        Assert.assertEquals("No non-RBAC configurations should be changed by RBAC config translation", stripAllRbacConfigs(false, connectorConfig), stripAllRbacConfigs(false, map));
    }

    @Test
    public void shouldNotTranslateRbacConfigsForPutConnectorConfigWhenDisabled() throws Exception {
        Map<String, String> workerConfig = workerConfig();
        workerConfig.put("enable.rbac.credential.injection", "false");
        this.connectSecurityFilter = newFilter(workerConfig);
        Map<String, String> connectorConfig = connectorConfig();
        connectorConfig.put("principal.service.name", USERNAME);
        connectorConfig.put("principal.service.password", PASSWORD);
        expectPutConnectorConfigRequest(connectorConfig);
        filterRequest();
        verifyResponseBodyWasNotAltered();
    }

    @Test
    public void shouldStripSecurityConfigsForListConnectorsResponse() throws Exception {
        this.authorizer.allowAllOperations();
        Map<String, String> connectorConfig = connectorConfig();
        connectorConfig.put("principal.service.name", USERNAME);
        connectorConfig.put("principal.service.password", "turtleisland4Lyfe!");
        RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS.forEach(str -> {
        });
        ConnectorInfo connectorInfo = new ConnectorInfo(CONNECTOR_NAME, connectorConfig, Collections.emptyList(), ConnectorType.SINK);
        HashMap hashMap = new HashMap(connectorConfig);
        Set set = RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS;
        hashMap.getClass();
        set.forEach((v1) -> {
            r1.remove(v1);
        });
        ConnectorStateInfo connectorStateInfo = new ConnectorStateInfo(CONNECTOR_NAME, new ConnectorStateInfo.ConnectorState("RUNNING", "http://localhost:6969", (String) null), Collections.emptyList(), ConnectorType.SINK);
        Map<String, String> connectorConfig2 = connectorConfig();
        connectorConfig2.put("principal.service.name", "Giovanni Sollima");
        connectorConfig2.put("principal.service.password", "goin4aWalk#nochair");
        RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS.forEach(str2 -> {
        });
        connectorConfig2.put("key.converter", AvroConverter.class.getName());
        connectorConfig2.put("key.converter.basic.auth.credentials.source", "USER_INFO");
        connectorConfig2.put("key.converter.basic.auth.user.info", "foo:bar");
        ConnectorInfo connectorInfo2 = new ConnectorInfo("ydkhaha", connectorConfig2, Collections.emptyList(), ConnectorType.SINK);
        HashMap hashMap2 = new HashMap(connectorConfig2);
        Set set2 = RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS;
        hashMap2.getClass();
        set2.forEach((v1) -> {
            r1.remove(v1);
        });
        hashMap2.remove("key.converter.basic.auth.credentials.source");
        hashMap2.remove("key.converter.basic.auth.user.info");
        Map<String, String> connectorConfig3 = connectorConfig();
        RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS.forEach(str3 -> {
        });
        ConnectorInfo connectorInfo3 = new ConnectorInfo("power-user", connectorConfig3, Collections.emptyList(), ConnectorType.SINK);
        HashMap hashMap3 = new HashMap(connectorConfig3);
        HashMap hashMap4 = new HashMap();
        ((Map) hashMap4.computeIfAbsent(CONNECTOR_NAME, str4 -> {
            return new HashMap();
        })).put("info", connectorInfo);
        ((Map) hashMap4.get(CONNECTOR_NAME)).put("status", connectorStateInfo);
        ((Map) hashMap4.computeIfAbsent("ydkhaha", str5 -> {
            return new HashMap();
        })).put("info", connectorInfo2);
        ((Map) hashMap4.computeIfAbsent("power-user", str6 -> {
            return new HashMap();
        })).put("info", connectorInfo3);
        expectRequestMethod(ConnectRestApiMethods.LIST_CONNECTORS);
        expectResponseStatus(Response.Status.OK);
        Mockito.when(this.responseContext.getEntity()).thenReturn(hashMap4);
        filterResponse();
        ((ContainerResponseContext) Mockito.verify(this.responseContext, Mockito.atLeastOnce())).setEntity(this.connectorsMapCaptor.capture());
        Map map = (Map) this.connectorsMapCaptor.getValue();
        Assert.assertEquals("No connectors should be dropped from response body", hashMap4.keySet(), map.keySet());
        hashMap4.forEach((str7, map2) -> {
            Assert.assertEquals("No connector status or info should be dropped from or added to response body", map2.keySet(), ((Map) map.get(str7)).keySet());
        });
        Assert.assertEquals("Client configs should be stripped from connector config when RBAC credential configs are present", hashMap, ((ConnectorInfo) ((Map) map.get(CONNECTOR_NAME)).get("info")).config());
        Assert.assertEquals("Connector state should be unaffected", connectorStateInfo, ((Map) map.get(CONNECTOR_NAME)).get("status"));
        Assert.assertEquals("Client and Schema Registry-based converter security configs should be stripped fromconnector config when RBAC credential configs are present", hashMap2, ((ConnectorInfo) ((Map) map.get("ydkhaha")).get("info")).config());
        Assert.assertEquals("Connector config should be unaffected when no RBAC credential configs are present", hashMap3, ((ConnectorInfo) ((Map) map.get("power-user")).get("info")).config());
    }

    @Test
    public void shouldStripSecurityConfigsForCreateConnectorResponse() throws Exception {
        Map<String, String> connectorConfig = connectorConfig();
        connectorConfig.put("principal.service.name", USERNAME);
        connectorConfig.put("principal.service.password", PASSWORD);
        RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS.forEach(str -> {
        });
        ConnectorInfo connectorInfo = new ConnectorInfo(CONNECTOR_NAME, connectorConfig, Collections.emptyList(), ConnectorType.SINK);
        HashMap hashMap = new HashMap(connectorConfig);
        Set set = RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS;
        hashMap.getClass();
        set.forEach((v1) -> {
            r1.remove(v1);
        });
        expectRequestMethod(ConnectRestApiMethods.CREATE_CONNECTOR);
        expectResponseStatus(Response.Status.OK);
        Mockito.when(this.responseContext.getEntity()).thenReturn(connectorInfo);
        filterResponse();
        ((ContainerResponseContext) Mockito.verify(this.responseContext, Mockito.never())).setEntity(ArgumentMatchers.any());
        Assert.assertEquals(hashMap, connectorInfo.config());
    }

    @Test
    public void shouldStripSecurityConfigsForPutConnectorResponse() throws Exception {
        Map<String, String> connectorConfig = connectorConfig();
        connectorConfig.put("principal.service.name", USERNAME);
        connectorConfig.put("principal.service.password", PASSWORD);
        RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS.forEach(str -> {
        });
        ConnectorInfo connectorInfo = new ConnectorInfo(CONNECTOR_NAME, connectorConfig, Collections.emptyList(), ConnectorType.SINK);
        HashMap hashMap = new HashMap(connectorConfig);
        Set set = RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS;
        hashMap.getClass();
        set.forEach((v1) -> {
            r1.remove(v1);
        });
        expectRequestMethod(ConnectRestApiMethods.PUT_CONNECTOR_CONFIG);
        expectResponseStatus(Response.Status.OK);
        Mockito.when(this.responseContext.getEntity()).thenReturn(connectorInfo);
        filterResponse();
        Assert.assertEquals(hashMap, connectorInfo.config());
    }

    @Test
    public void shouldStripSecurityConfigsForGetConnectorResponse() throws Exception {
        Map<String, String> connectorConfig = connectorConfig();
        connectorConfig.put("principal.service.name", USERNAME);
        connectorConfig.put("principal.service.password", PASSWORD);
        RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS.forEach(str -> {
        });
        ConnectorInfo connectorInfo = new ConnectorInfo(CONNECTOR_NAME, connectorConfig, Collections.emptyList(), ConnectorType.SINK);
        HashMap hashMap = new HashMap(connectorConfig);
        Set set = RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS;
        hashMap.getClass();
        set.forEach((v1) -> {
            r1.remove(v1);
        });
        expectRequestMethod(ConnectRestApiMethods.GET_CONNECTOR);
        expectResponseStatus(Response.Status.OK);
        Mockito.when(this.responseContext.getEntity()).thenReturn(connectorInfo);
        filterResponse();
        Assert.assertEquals(hashMap, connectorInfo.config());
    }

    @Test
    public void shouldStripSecurityConfigsForGetConnectorConfigResponse() throws Exception {
        Map<String, String> connectorConfig = connectorConfig();
        connectorConfig.put("principal.service.name", USERNAME);
        connectorConfig.put("principal.service.password", PASSWORD);
        RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS.forEach(str -> {
        });
        HashMap hashMap = new HashMap(connectorConfig);
        Set set = RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS;
        hashMap.getClass();
        set.forEach((v1) -> {
            r1.remove(v1);
        });
        expectRequestMethod(ConnectRestApiMethods.GET_CONNECTOR_CONFIG);
        expectResponseStatus(Response.Status.OK);
        Mockito.when(this.responseContext.getEntity()).thenReturn(connectorConfig);
        filterResponse();
        Assert.assertEquals(hashMap, connectorConfig);
    }

    @Test
    public void shouldNotTryToStripSecurityConfigsForNonSrConverter() throws Exception {
        Map<String, String> connectorConfig = connectorConfig();
        connectorConfig.put("principal.service.name", USERNAME);
        connectorConfig.put("principal.service.password", PASSWORD);
        RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS.forEach(str -> {
        });
        connectorConfig.put("key.converter", "Rot16Converter");
        connectorConfig.put("key.converter.basic.auth.credentials.source", "USER_INFO");
        connectorConfig.put("key.converter.basic.auth.user.info", "foo:bar");
        ConnectorInfo connectorInfo = new ConnectorInfo(CONNECTOR_NAME, connectorConfig, Collections.emptyList(), ConnectorType.SINK);
        HashMap hashMap = new HashMap(connectorConfig);
        Set set = RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS;
        hashMap.getClass();
        set.forEach((v1) -> {
            r1.remove(v1);
        });
        expectRequestMethod(ConnectRestApiMethods.GET_CONNECTOR);
        expectResponseStatus(Response.Status.OK);
        Mockito.when(this.responseContext.getEntity()).thenReturn(connectorInfo);
        filterResponse();
        Assert.assertEquals(hashMap, connectorInfo.config());
    }

    @Test
    public void shouldNotTryToFilterEmptyConnectorsListResponse() throws Exception {
        expectConnectorsRequest(true);
        Mockito.when(this.responseContext.getEntity()).thenReturn(Collections.emptyList());
        filterResponse();
    }

    @Test
    public void shouldNotTryToFilterEmptyConnectorsMapResponse() throws Exception {
        expectConnectorsRequest(true);
        Mockito.when(this.responseContext.getEntity()).thenReturn(Collections.emptyMap());
        filterResponse();
    }

    @Test
    public void shouldNotTryToFilterNonConnectorsListRequest() {
        expectResponseStatus(Response.Status.OK);
        expectRequestMethod(DUMMY_METHOD);
        filterResponse();
        ((ContainerResponseContext) Mockito.verify(this.responseContext, Mockito.atLeastOnce())).getStatus();
        Mockito.verifyNoMoreInteractions(new Object[]{this.responseContext});
        Mockito.verifyNoMoreInteractions(new Object[]{this.requestContext});
    }

    @Test
    public void shouldNotTryToFilterConnectorsListOnFailedRequest() {
        expectConnectorsRequest(false);
        filterResponse();
        ((ContainerResponseContext) Mockito.verify(this.responseContext, Mockito.atLeastOnce())).getStatus();
        Mockito.verifyNoMoreInteractions(new Object[]{this.responseContext});
        Mockito.verifyNoMoreInteractions(new Object[]{this.requestContext});
    }

    @Test
    public void shouldFilterConnectorsListOnSuccessfulRequest() {
        expectConnectorsRequest(true);
        Mockito.when(this.responseContext.getEntity()).thenReturn(CONNECTOR_NAMES);
        filterResponse();
        ((ContainerResponseContext) Mockito.verify(this.responseContext)).setEntity(this.connectorsCaptor.capture());
        assertConnectorListMatches(VISIBLE_CONNECTOR_NAMES, (Collection) this.connectorsCaptor.getValue());
    }

    @Test
    public void shouldFilterConnectorsMapOnSuccessfulRequest() {
        expectConnectorsRequest(true);
        Mockito.when(this.responseContext.getEntity()).thenReturn(CONNECTORS);
        filterResponse();
        ((ContainerResponseContext) Mockito.verify(this.responseContext)).setEntity(this.connectorsMapCaptor.capture());
        assertConnectorListMatches(VISIBLE_CONNECTOR_NAMES, ((Map) this.connectorsMapCaptor.getValue()).keySet());
    }

    @Test
    public void shouldFilterConnectorInfoMapOnSuccessfulRequest() {
        expectConnectorsRequest(true);
        Mockito.when(this.responseContext.getEntity()).thenReturn(CONNECTORS);
        filterResponse();
        ((ContainerResponseContext) Mockito.verify(this.responseContext)).setEntity(this.connectorsMapCaptor.capture());
        assertConnectorListMatches(VISIBLE_CONNECTOR_NAMES, ((Map) this.connectorsMapCaptor.getValue()).keySet());
        Assert.assertFalse(((Map) ((Map) this.connectorsMapCaptor.getValue()).get("connectorOne")).containsKey("info"));
        Assert.assertTrue(((Map) ((Map) this.connectorsMapCaptor.getValue()).get("connectorOne")).containsKey("status"));
        Assert.assertTrue(((Map) ((Map) this.connectorsMapCaptor.getValue()).get("connectorTwo")).containsKey("info"));
        Assert.assertTrue(((Map) ((Map) this.connectorsMapCaptor.getValue()).get("connectorTwo")).containsKey("status"));
    }

    @Test
    public void should500OnMismatchedReturn() {
        expectConnectorsRequest(true);
        Mockito.when(this.responseContext.getEntity()).thenReturn(CONNECTOR_NAMES);
        this.authorizer = (MockAuthorizer) Mockito.mock(MockAuthorizer.class);
        Mockito.when(this.authorizer.authorize((KafkaPrincipal) ArgumentMatchers.any(), (String) ArgumentMatchers.any(), (List) ArgumentMatchers.any())).thenReturn(Collections.emptyList());
        this.connectSecurityFilter = new ConnectSecurityFilter(new ConnectSecurityExtensionConfig(workerConfig()), SCOPE, this.connectActions, this.authorizer, this.resourceInfo);
        filterResponse();
        ((ContainerRequestContext) Mockito.verify(this.requestContext)).abortWith((Response) this.responseCaptor.capture());
        assertErrorResponseMatches((Response) this.responseCaptor.getValue(), Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), null);
    }

    @Test
    public void shouldListConnectorWithAnyPermittedConnectorOperation() {
        String str = "replicatorIsTotallyAConnectorCommaNathan";
        Set singleton = Collections.singleton("replicatorIsTotallyAConnectorCommaNathan");
        ConnectorOperations.ALL.forEach(operation -> {
            Mockito.reset(new Object[]{this.resourceInfo, this.responseContext});
            this.authorizer.clear();
            expectResponseStatus(Response.Status.OK);
            expectRequestMethod(ConnectRestApiMethods.LIST_CONNECTORS);
            Mockito.when(this.responseContext.getEntity()).thenReturn(singleton);
            this.authorizer.allowConnectorOperations(PRINCIPAL, str, operation);
            filterResponse();
            ((ContainerResponseContext) Mockito.verify(this.responseContext)).setEntity(this.connectorsCaptor.capture());
            assertConnectorListMatches(singleton, (Collection) this.connectorsCaptor.getValue());
        });
    }

    @Test
    public void shouldAllowActionsThatDoNotRequireAuthorization() throws Exception {
        this.authorizer.expectAuthorization(false);
        expectRequestMethod(DUMMY_METHOD);
        Mockito.when(this.connectActions.actions(DUMMY_METHOD, this.requestContext)).thenReturn(Collections.emptyList());
        filterRequest();
        Mockito.verifyNoMoreInteractions(new Object[]{this.requestContext});
    }

    @Test
    public void shouldAllowAuthorizedActions() throws Exception {
        String str = "rotcennoc";
        this.authorizer.clear();
        this.authorizer.allowConnectorOperations(PRINCIPAL, "rotcennoc", ConnectorOperations.PAUSE, ConnectorOperations.RESUME);
        expectRequestMethod(DUMMY_METHOD);
        Mockito.when(this.connectActions.actions(DUMMY_METHOD, this.requestContext)).thenReturn(Stream.of((Object[]) new Operation[]{ConnectorOperations.PAUSE, ConnectorOperations.RESUME}).map(operation -> {
            return new Action(SCOPE, ConnectActions.CONNECTOR_RESOURCE, str, operation);
        }).collect(Collectors.toList()));
        filterRequest();
    }

    @Test
    public void shouldDisallowUnauthenticatedActions() throws Exception {
        String str = "rotcennoc";
        this.authorizer.clear();
        expectRequestMethod(DUMMY_METHOD);
        Mockito.when(this.connectActions.actions(DUMMY_METHOD, this.requestContext)).thenReturn(Stream.of((Object[]) new Operation[]{ConnectorOperations.PAUSE, ConnectorOperations.RESUME}).map(operation -> {
            return new Action(SCOPE, ConnectActions.CONNECTOR_RESOURCE, str, operation);
        }).collect(Collectors.toList()));
        SecurityContext securityContext = (SecurityContext) Mockito.mock(SecurityContext.class);
        Mockito.when(securityContext.getUserPrincipal()).thenReturn((Object) null);
        Mockito.when(this.requestContext.getSecurityContext()).thenReturn(securityContext);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        filterRequest();
        ((ContainerRequestContext) Mockito.verify(this.requestContext)).abortWith((Response) forClass.capture());
        assertErrorResponseMatches((Response) forClass.getValue(), Response.Status.UNAUTHORIZED.getStatusCode(), "User not authenticated");
    }

    @Test
    public void shouldDisallowUnauthorizedActions() throws Exception {
        String str = "rotcennoc";
        this.authorizer.clear();
        expectRequestMethod(DUMMY_METHOD);
        Mockito.when(this.connectActions.actions(DUMMY_METHOD, this.requestContext)).thenReturn(Stream.of((Object[]) new Operation[]{ConnectorOperations.PAUSE, ConnectorOperations.RESUME}).map(operation -> {
            return new Action(SCOPE, ConnectActions.CONNECTOR_RESOURCE, str, operation);
        }).collect(Collectors.toList()));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Response.class);
        filterRequest();
        ((ContainerRequestContext) Mockito.verify(this.requestContext)).abortWith((Response) forClass.capture());
        assertErrorResponseMatches((Response) forClass.getValue(), Response.Status.FORBIDDEN.getStatusCode(), "Unauthorized operation");
    }

    protected void filterRequest() throws Exception {
        this.connectSecurityFilter.filter(this.requestContext);
    }

    protected void filterResponse() {
        this.connectSecurityFilter.filter(this.requestContext, this.responseContext);
    }

    protected void expectConnectorsRequest(boolean z) {
        expectResponseStatus(z ? Response.Status.OK : Response.Status.UNAUTHORIZED);
        expectRequestMethod(ConnectRestApiMethods.LIST_CONNECTORS);
    }

    protected void expectValidateConfigResponse(ConfigInfos configInfos) throws IOException {
        expectResponseStatus(Response.Status.OK);
        expectRequestMethod(ConnectRestApiMethods.CONNECTOR_CONFIG_VALIDATE);
        Mockito.when(this.responseContext.getEntity()).thenReturn(configInfos);
    }

    protected void expectCreateConnectorRequest(String str, Map<String, String> map, CreateConnectorRequest.InitialState initialState) throws IOException {
        expectRequestMethod(ConnectRestApiMethods.CREATE_CONNECTOR);
        expectRequestBody(new CreateConnectorRequest(str, map, initialState));
    }

    protected void expectCreateConnectorRequest(String str, Map<String, String> map) throws IOException {
        expectRequestMethod(ConnectRestApiMethods.CREATE_CONNECTOR);
        expectRequestBody(new CreateConnectorRequest(str, map, (CreateConnectorRequest.InitialState) null));
    }

    protected void expectPutConnectorConfigRequest(Map<String, String> map) throws IOException {
        expectRequestMethod(ConnectRestApiMethods.PUT_CONNECTOR_CONFIG);
        expectRequestBody(map);
    }

    protected void expectRequestMethod(Method method) {
        Mockito.when(this.resourceInfo.getResourceMethod()).thenReturn(method);
    }

    protected void expectRequestBody(Object obj) throws IOException {
        Mockito.when(this.requestContext.getEntityStream()).thenReturn(new ByteArrayInputStream(new ObjectMapper().writeValueAsBytes(obj)));
        ConnectRestUtils.setEntity(this.requestContext, obj);
    }

    protected void expectResponseStatus(Response.Status status) {
        Mockito.when(Integer.valueOf(this.responseContext.getStatus())).thenReturn(Integer.valueOf(status.getStatusCode()));
    }

    protected void assertConnectorListMatches(Collection<String> collection, Collection<String> collection2) {
        Assert.assertEquals(new HashSet(collection), new HashSet(collection2));
        Assert.assertEquals(collection.size(), collection2.size());
    }

    protected void verifyResponseBodyWasNotAltered() {
        ((ContainerResponseContext) Mockito.verify(this.responseContext, Mockito.never())).setEntity(ArgumentMatchers.any());
    }

    protected void verifyRbacSaslJaasConfigTranslation(Map<String, String> map) {
        String format = String.format("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username=\"%s\" password=\"%s\" metadataServerUrls=\"%s\";", USERNAME, PASSWORD, MDS_URL);
        RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS.forEach(str -> {
            Assert.assertEquals(String.format("RBAC username and password properties should be translated into %s property", str), format, map.get(str));
        });
    }

    protected void verifyRbacSrConverterConfigTranslation(String str, Map<String, String> map, Map<String, String> map2) {
        Assert.assertEquals("Converter class should be configured", map.get(str), map2.get(str));
        Assert.assertEquals("Converter should be configured to use user-provided basic auth credentials", "USER_INFO", map2.get(str + ".basic.auth.credentials.source"));
        Assert.assertEquals("Converter should be configured with basic auth credentials", "Mark Summer:turtleIsland4Lyfe!", map2.get(str + ".basic.auth.user.info"));
        Assert.assertEquals("Non-security converter configs should be allowed to pass through config translation", map, map2.entrySet().stream().filter(entry -> {
            if (((String) entry.getKey()).equals(str)) {
                return true;
            }
            if (!((String) entry.getKey()).startsWith(str + ".")) {
                return false;
            }
            String substring = ((String) entry.getKey()).substring(str.length() + 1);
            return (substring.equals("basic.auth.credentials.source") || substring.equals("bearer.auth.credentials.source") || substring.equals("basic.auth.user.info")) ? false : true;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
    }

    protected Map<String, String> stripAllRbacConfigs(boolean z, Map<String, String> map) {
        return (Map) map.entrySet().stream().filter(entry -> {
            return (isInjectedRbacConfig((String) entry.getKey()) || RbacBasicCredentialsManipulator.CONNECTOR_CLIENT_SASL_JAAS_CONFIGS.contains(entry.getKey()) || (z && isConverterConfig((String) entry.getKey()))) ? false : true;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    protected static boolean isInjectedRbacConfig(String str) {
        return str != null && (str.equals("principal.service.name") || str.equals("principal.service.password"));
    }

    protected static boolean isConverterConfig(String str) {
        return str != null && (str.equals("key.converter") || str.equals("value.converter") || str.startsWith("key.converter.") || str.startsWith("value.converter."));
    }

    protected void assertErrorResponseMatches(Response response, int i, String str) {
        Assert.assertEquals(response.getStatus(), i);
        Assert.assertThat(response.getEntity(), IsInstanceOf.instanceOf(ErrorMessage.class));
        ErrorMessage errorMessage = (ErrorMessage) response.getEntity();
        Assert.assertEquals(i, errorMessage.getErrorCode());
        if (str != null) {
            Assert.assertEquals(str, errorMessage.getMessage());
        }
    }

    public void dummyMethod() {
    }

    static {
        try {
            DUMMY_METHOD = ConnectSecurityFilterTest.class.getMethod("dummyMethod", new Class[0]);
            SCHEMA_REGISTRY_CONVERTERS = ImmutableSet.of(AvroConverter.class, JsonSchemaConverter.class, ProtobufConverter.class);
        } catch (NoSuchMethodException e) {
            throw new RuntimeException("Failed to locate method used during testing");
        }
    }
}
