package co.cask.cdap.data2.metadata.publisher;

import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.common.namespace.guice.NamespaceClientRuntimeModule;
import co.cask.cdap.data.runtime.DataSetsModules;
import co.cask.cdap.data.runtime.SystemDatasetRuntimeModule;
import co.cask.cdap.data2.metadata.store.DefaultMetadataStore;
import co.cask.cdap.data2.metadata.store.MetadataStore;
import co.cask.cdap.kafka.KafkaTester;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.codec.NamespacedIdCodec;
import co.cask.cdap.proto.metadata.MetadataChangeRecord;
import co.cask.cdap.proto.metadata.MetadataRecord;
import co.cask.cdap.proto.metadata.MetadataScope;
import co.cask.tephra.runtime.TransactionInMemoryModule;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.inject.AbstractModule;
import com.google.inject.Module;
import com.google.inject.util.Modules;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/data2/metadata/publisher/KafkaMetadataChangePublisherTest.class */
public class KafkaMetadataChangePublisherTest {
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Id.NamespacedId.class, new NamespacedIdCodec()).create();

    @ClassRule
    public static final KafkaTester KAFKA_TESTER = new KafkaTester(ImmutableMap.of("metadata.updates.publish.enabled", "true"), ImmutableList.of(Modules.override(new Module[]{new DataSetsModules().getInMemoryModules()}).with(new Module[]{new AbstractModule() { // from class: co.cask.cdap.data2.metadata.publisher.KafkaMetadataChangePublisherTest.1
        protected void configure() {
            bind(MetadataStore.class).to(DefaultMetadataStore.class);
        }
    }}), new LocationRuntimeModule().getInMemoryModules(), new TransactionInMemoryModule(), new SystemDatasetRuntimeModule().getInMemoryModules(), new NamespaceClientRuntimeModule().getInMemoryModules()), 1, "metadata.updates.kafka.broker.list");
    private static MetadataChangePublisher publisher;

    @Before
    public void setup() throws IOException {
        publisher = (MetadataChangePublisher) KAFKA_TESTER.getInjector().getInstance(MetadataChangePublisher.class);
    }

    /* JADX WARN: Type inference failed for: r0v9, types: [co.cask.cdap.data2.metadata.publisher.KafkaMetadataChangePublisherTest$2] */
    @Test
    public void testPublish() throws InterruptedException {
        List<MetadataChangeRecord> generateMetadataChanges = generateMetadataChanges();
        Iterator<MetadataChangeRecord> it = generateMetadataChanges.iterator();
        while (it.hasNext()) {
            publisher.publish(it.next());
        }
        Assert.assertEquals(generateMetadataChanges, KAFKA_TESTER.getPublishedMessages(KAFKA_TESTER.getCConf().get("metadata.updates.kafka.topic"), generateMetadataChanges.size(), new TypeToken<MetadataChangeRecord>() { // from class: co.cask.cdap.data2.metadata.publisher.KafkaMetadataChangePublisherTest.2
        }.getType(), GSON));
    }

    private List<MetadataChangeRecord> generateMetadataChanges() {
        long currentTimeMillis = System.currentTimeMillis();
        ImmutableList.Builder builder = ImmutableList.builder();
        Id.DatasetInstance from = Id.DatasetInstance.from("ns1", "ds1");
        builder.add(new MetadataChangeRecord(new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of(), ImmutableSet.of()), new MetadataChangeRecord.MetadataDiffRecord(new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of("key1", "value1"), ImmutableSet.of("tag1")), new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of(), ImmutableSet.of())), currentTimeMillis - 1000));
        builder.add(new MetadataChangeRecord(new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of("key1", "value1"), ImmutableSet.of("tag1")), new MetadataChangeRecord.MetadataDiffRecord(new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of("key1", "v1"), ImmutableSet.of()), new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of("key1", "value1"), ImmutableSet.of())), currentTimeMillis - 800));
        builder.add(new MetadataChangeRecord(new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of("key1", "v1"), ImmutableSet.of("tag1")), new MetadataChangeRecord.MetadataDiffRecord(new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of("key2", "value2"), ImmutableSet.of()), new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of(), ImmutableSet.of())), currentTimeMillis - 600));
        builder.add(new MetadataChangeRecord(new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of("key1", "v1", "key2", "value2"), ImmutableSet.of("tag1")), new MetadataChangeRecord.MetadataDiffRecord(new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of(), ImmutableSet.of()), new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of("key1", "v1"), ImmutableSet.of())), currentTimeMillis - 400));
        builder.add(new MetadataChangeRecord(new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of("key2", "value2"), ImmutableSet.of("tag1")), new MetadataChangeRecord.MetadataDiffRecord(new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of(), ImmutableSet.of()), new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of(), ImmutableSet.of("tag1"))), currentTimeMillis - 200));
        builder.add(new MetadataChangeRecord(new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of("key2", "value2"), ImmutableSet.of()), new MetadataChangeRecord.MetadataDiffRecord(new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of(), ImmutableSet.of("tag1")), new MetadataRecord(from, MetadataScope.USER, ImmutableMap.of(), ImmutableSet.of())), currentTimeMillis));
        return builder.build();
    }
}
