package io.confluent.ksql.security;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.security.authorizer.KsqlAuthorizerProviderImpl;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.security.auth.client.RestAuthorizer;
import java.io.IOException;
import java.util.Optional;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsInstanceOf;
import org.hamcrest.core.StringContains;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:io/confluent/ksql/security/KsqlConfluentSecurityExtensionTest.class */
public class KsqlConfluentSecurityExtensionTest {

    @Mock
    private RestAuthorizer restAuthorizer;
    private KsqlConfluentSecurityExtension ksqlConfluentSecurityExtension;

    @Before
    public void setUp() {
        this.ksqlConfluentSecurityExtension = new KsqlConfluentSecurityExtension(() -> {
            return this.restAuthorizer;
        }, ksqlConfig -> {
            return "kafka-id";
        }, ksqlConfig2 -> {
            return false;
        });
        this.ksqlConfluentSecurityExtension.initialize(new KsqlConfig(ImmutableMap.of("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name())));
    }

    @Test
    public void shouldThrowOnInitializeIfSecurityProtocolIsEmpty() {
        KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of("security.protocol", ""));
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(KsqlException.class, () -> {
            this.ksqlConfluentSecurityExtension.initialize(ksqlConfig);
        })).getMessage(), StringContains.containsString(String.format("Failed to initialize Confluent RBAC: '%s' is empty. Only SASL_PLAINTEXT and SASL_SSL are allowed.", "security.protocol")));
    }

    @Test
    public void shouldThrowOnInitializeIfSecurityProtocolIsUnknown() {
        KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of("security.protocol", "YO-YO"));
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(KsqlException.class, () -> {
            this.ksqlConfluentSecurityExtension.initialize(ksqlConfig);
        })).getMessage(), StringContains.containsString(String.format("Failed to initialize Confluent RBAC: Unknown '%s' value (YO-YO). Only SASL_PLAINTEXT and SASL_SSL are allowed.", "security.protocol")));
    }

    @Test
    public void shouldThrowOnInitializeIfSecurityProtocolIsPlainText() {
        KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of("security.protocol", SecurityProtocol.PLAINTEXT.name()));
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(KsqlException.class, () -> {
            this.ksqlConfluentSecurityExtension.initialize(ksqlConfig);
        })).getMessage(), StringContains.containsString(buildInitializeErrorMessage(SecurityProtocol.PLAINTEXT)));
    }

    @Test
    public void shouldThrowOnInitializeIfSecurityProtocolIsSSL() {
        KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of("security.protocol", SecurityProtocol.SSL.name()));
        MatcherAssert.assertThat(((Exception) Assert.assertThrows(KsqlException.class, () -> {
            this.ksqlConfluentSecurityExtension.initialize(ksqlConfig);
        })).getMessage(), StringContains.containsString(buildInitializeErrorMessage(SecurityProtocol.SSL)));
    }

    @Test
    public void shouldNotThrowIfSecurityProtocolIsSaslPlainText() {
        this.ksqlConfluentSecurityExtension.initialize(new KsqlConfig(ImmutableMap.of("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name())));
    }

    @Test
    public void shouldNotThrowIfSecurityProtocolIsSaslSSL() {
        this.ksqlConfluentSecurityExtension.initialize(new KsqlConfig(ImmutableMap.of("security.protocol", SecurityProtocol.SASL_SSL.name())));
    }

    @Test
    public void shouldGetAuthorizationProvider() {
        Optional authorizationProvider = this.ksqlConfluentSecurityExtension.getAuthorizationProvider();
        MatcherAssert.assertThat(authorizationProvider.get(), IsInstanceOf.instanceOf(KsqlAuthorizerProviderImpl.class));
        MatcherAssert.assertThat(((KsqlAuthorizerProviderImpl) authorizationProvider.get()).getSchemaRegistryPermissionsValidator(), CoreMatchers.is(Optional.empty()));
    }

    @Test
    public void shouldEnableSchemaRegistryPermissionsValidator() {
        this.ksqlConfluentSecurityExtension = new KsqlConfluentSecurityExtension(() -> {
            return this.restAuthorizer;
        }, ksqlConfig -> {
            return "kafka-id";
        }, ksqlConfig2 -> {
            return true;
        });
        this.ksqlConfluentSecurityExtension.initialize(new KsqlConfig(ImmutableMap.of("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name())));
        Optional authorizationProvider = this.ksqlConfluentSecurityExtension.getAuthorizationProvider();
        MatcherAssert.assertThat(authorizationProvider.get(), IsInstanceOf.instanceOf(KsqlAuthorizerProviderImpl.class));
        MatcherAssert.assertThat(((KsqlAuthorizerProviderImpl) authorizationProvider.get()).getSchemaRegistryPermissionsValidator(), CoreMatchers.not(Optional.empty()));
    }

    @Test
    public void shouldGetUserContextProvider() {
        MatcherAssert.assertThat(this.ksqlConfluentSecurityExtension.getUserContextProvider().get(), IsInstanceOf.instanceOf(KsqlUserContextProviderImpl.class));
    }

    @Test
    public void shouldNotGetPermissionsValidatorIfExplicitlyDisabled() {
        this.ksqlConfluentSecurityExtension = new KsqlConfluentSecurityExtension(() -> {
            return this.restAuthorizer;
        }, ksqlConfig -> {
            return "kafka-id";
        }, ksqlConfig2 -> {
            return true;
        });
        this.ksqlConfluentSecurityExtension.initialize(new KsqlConfig(ImmutableMap.of("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name(), "ksql.security.extension.sr-permissions.validator.enabled", false)));
        MatcherAssert.assertThat(this.ksqlConfluentSecurityExtension.getSchemaRegistryPermissionsValidator(), CoreMatchers.is(Optional.empty()));
    }

    @Test
    public void shouldCloseRestAuthorizerOnClose() throws IOException {
        this.ksqlConfluentSecurityExtension.close();
        ((RestAuthorizer) Mockito.verify(this.restAuthorizer)).close();
    }

    private String buildInitializeErrorMessage(SecurityProtocol securityProtocol) {
        return String.format("Failed to initialize Confluent RBAC: Unsupported '%s' value (%s). Only SASL_PLAINTEXT and SASL_SSL are allowed.", "security.protocol", securityProtocol);
    }
}
