package co.cask.cdap.internal.audit;

import co.cask.cdap.WordCountApp;
import co.cask.cdap.common.guice.KafkaClientModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.data2.audit.AuditModule;
import co.cask.cdap.internal.AppFabricTestHelper;
import co.cask.cdap.kafka.KafkaTester;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.audit.AuditMessage;
import co.cask.cdap.proto.audit.AuditType;
import co.cask.cdap.proto.codec.AuditMessageTypeAdapter;
import co.cask.cdap.proto.codec.EntityIdTypeAdapter;
import co.cask.cdap.proto.element.EntityType;
import co.cask.cdap.proto.id.ArtifactId;
import co.cask.cdap.proto.id.EntityId;
import co.cask.cdap.proto.id.Ids;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.NamespacedId;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.inject.AbstractModule;
import com.google.inject.Injector;
import java.util.Collections;
import java.util.List;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.zookeeper.ZKClientService;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/internal/audit/AuditPublishTest.class */
public class AuditPublishTest {

    @ClassRule
    public static final KafkaTester KAFKA_TESTER = new KafkaTester(ImmutableMap.of("audit.enabled", "true"), Collections.emptyList(), 1, new String[0]);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(AuditMessage.class, new AuditMessageTypeAdapter()).registerTypeAdapter(EntityId.class, new EntityIdTypeAdapter()).create();
    private static ZKClientService zkClient;
    private static KafkaClientService kafkaClient;

    @BeforeClass
    public static void init() throws Exception {
        Injector injector = AppFabricTestHelper.getInjector(KAFKA_TESTER.getCConf(), new AbstractModule() { // from class: co.cask.cdap.internal.audit.AuditPublishTest.1
            protected void configure() {
                install(new AuditModule().getDistributedModules());
                install(new ZKClientModule());
                install(new KafkaClientModule());
            }
        });
        zkClient = (ZKClientService) injector.getInstance(ZKClientService.class);
        zkClient.startAndWait();
        kafkaClient = (KafkaClientService) injector.getInstance(KafkaClientService.class);
        kafkaClient.startAndWait();
    }

    @AfterClass
    public static void stop() throws Exception {
        zkClient.stopAndWait();
        kafkaClient.startAndWait();
    }

    @Test
    public void testPublish() throws Exception {
        String namespace = NamespaceId.DEFAULT.getNamespace();
        String simpleName = WordCountApp.class.getSimpleName();
        ImmutableSet of = ImmutableSet.of(Ids.namespace(namespace).artifact(WordCountApp.class.getSimpleName(), "1"), Ids.namespace(namespace).app(simpleName), Ids.namespace(namespace).app(simpleName).flow(WordCountApp.WordCountFlow.class.getSimpleName()), Ids.namespace(namespace).app(simpleName).mr(WordCountApp.VoidMapReduceJob.class.getSimpleName()), Ids.namespace(namespace).app(simpleName).service(WordCountApp.WordFrequencyService.class.getSimpleName()), Ids.namespace(namespace).dataset("mydataset"), new EntityId[]{Ids.namespace(namespace).stream("text")});
        HashMultimap create = HashMultimap.create();
        create.putAll(AuditType.METADATA_CHANGE, of);
        create.putAll(AuditType.CREATE, ImmutableSet.of(Ids.namespace(namespace).dataset("mydataset"), Ids.namespace(namespace).stream("text")));
        AppFabricTestHelper.deployApplication(Id.Namespace.DEFAULT, WordCountApp.class, null, KAFKA_TESTER.getCConf());
        List<AuditMessage> publishedMessages = KAFKA_TESTER.getPublishedMessages(KAFKA_TESTER.getCConf().get("audit.kafka.topic"), 11, AuditMessage.class, GSON);
        HashMultimap create2 = HashMultimap.create();
        for (AuditMessage auditMessage : publishedMessages) {
            NamespacedId entityId = auditMessage.getEntityId();
            if (!(entityId instanceof NamespacedId) || !entityId.getNamespace().equals(NamespaceId.SYSTEM.getNamespace())) {
                if (entityId.getEntity() == EntityType.ARTIFACT && (entityId instanceof ArtifactId)) {
                    ArtifactId artifactId = (ArtifactId) entityId;
                    entityId = Ids.namespace(artifactId.getNamespace()).artifact(artifactId.getArtifact(), "1");
                }
                create2.put(auditMessage.getType(), entityId);
            }
        }
        Assert.assertEquals(create, create2);
    }
}
