package org.apache.rya.pcj.fluo.test.base;

import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Objects;
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode$Disabled$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.recipes.test.AccumuloExportITBase;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.AccumuloRyaDAO;
import org.apache.rya.api.client.CreatePCJ;
import org.apache.rya.api.client.Install;
import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
import org.apache.rya.indexing.pcj.fluo.app.batch.BatchObserver;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterParameters;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaSubGraphExporterParameters;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer;
import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier;
import org.apache.rya.rdftriplestore.RyaSailRepository;
import org.apache.rya.sail.config.RyaSailFactory;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
import org.eclipse.rdf4j.sail.Sail;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.class */
public class KafkaExportITBase extends AccumuloExportITBase {
    protected static final String RYA_INSTANCE_NAME = "test_";
    private static final String ZKHOST = "127.0.0.1";
    private static final String BROKERHOST = "127.0.0.1";
    private static final String BROKERPORT = "9092";
    private ZkUtils zkUtils;
    private KafkaServer kafkaServer;
    private EmbeddedZookeeper zkServer;
    private ZkClient zkClient;
    private RyaSailRepository ryaSailRepo = null;
    private AccumuloRyaDAO dao = null;

    protected void preFluoInitHook() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ObserverSpecification(TripleObserver.class.getName()));
        arrayList.add(new ObserverSpecification(BatchObserver.class.getName()));
        arrayList.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
        arrayList.add(new ObserverSpecification(JoinObserver.class.getName()));
        arrayList.add(new ObserverSpecification(FilterObserver.class.getName()));
        arrayList.add(new ObserverSpecification(AggregationObserver.class.getName()));
        arrayList.add(new ObserverSpecification(ProjectionObserver.class.getName()));
        arrayList.add(new ObserverSpecification(ConstructQueryResultObserver.class.getName()));
        HashMap hashMap = new HashMap();
        KafkaBindingSetExporterParameters kafkaBindingSetExporterParameters = new KafkaBindingSetExporterParameters(hashMap);
        kafkaBindingSetExporterParameters.setUseKafkaBindingSetExporter(true);
        kafkaBindingSetExporterParameters.setKafkaBootStrapServers("127.0.0.1:9092");
        new KafkaSubGraphExporterParameters(hashMap).setUseKafkaSubgraphExporter(true);
        arrayList.add(new ObserverSpecification(QueryResultObserver.class.getName(), hashMap));
        super.getFluoConfiguration().addObservers(arrayList);
    }

    @Before
    public void setupKafka() throws Exception {
        installRyaInstance();
        this.zkServer = new EmbeddedZookeeper();
        String str = "127.0.0.1:" + this.zkServer.port();
        this.zkClient = new ZkClient(str, 30000, 30000, ZKStringSerializer$.MODULE$);
        this.zkUtils = ZkUtils.apply(this.zkClient, false);
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", str);
        properties.setProperty("broker.id", "0");
        properties.setProperty("log.dirs", Files.createTempDirectory("kafka-", new FileAttribute[0]).toAbsolutePath().toString());
        properties.setProperty("listeners", "PLAINTEXT://127.0.0.1:9092");
        this.kafkaServer = TestUtils.createServer(new KafkaConfig(properties), new MockTime());
    }

    @After
    public void teardownRya() {
        MiniAccumuloCluster miniAccumuloCluster = getMiniAccumuloCluster();
        try {
            AccumuloRyaClientFactory.build(new AccumuloConnectionDetails("root", "secret".toCharArray(), miniAccumuloCluster.getInstanceName(), miniAccumuloCluster.getZooKeepers()), super.getAccumuloConnector()).getUninstall().uninstall(RYA_INSTANCE_NAME);
            if (this.ryaSailRepo != null) {
                this.ryaSailRepo.shutDown();
            }
            if (this.dao != null) {
                this.dao.destroy();
            }
        } catch (Exception e) {
            System.out.println("Encountered the following Exception when shutting down Rya: " + e.getMessage());
        }
    }

    @After
    public void clearCaches() {
        StatementPatternIdCacheSupplier.clear();
        MetadataCacheSupplier.clear();
    }

    private void installRyaInstance() throws Exception {
        MiniAccumuloCluster miniAccumuloCluster = super.getMiniAccumuloCluster();
        String instanceName = miniAccumuloCluster.getInstanceName();
        String zooKeepers = miniAccumuloCluster.getZooKeepers();
        AccumuloRyaClientFactory.build(new AccumuloConnectionDetails("root", "secret".toCharArray(), instanceName, zooKeepers), super.getAccumuloConnector()).getInstall().install(RYA_INSTANCE_NAME, Install.InstallConfiguration.builder().setEnableTableHashPrefix(false).setEnableFreeTextIndex(false).setEnableEntityCentricIndex(false).setEnableGeoIndex(false).setEnableTemporalIndex(false).setEnablePcjIndex(true).setFluoPcjAppName(super.getFluoConfiguration().getApplicationName()).build());
        AccumuloRdfConfiguration makeConfig = makeConfig(instanceName, zooKeepers);
        Sail ryaSailFactory = RyaSailFactory.getInstance(makeConfig);
        this.dao = RyaSailFactory.getAccumuloDAOWithUpdatedConfig(makeConfig);
        this.ryaSailRepo = new RyaSailRepository(ryaSailFactory);
    }

    protected AccumuloRdfConfiguration makeConfig(String str, String str2) {
        AccumuloRdfConfiguration accumuloRdfConfiguration = new AccumuloRdfConfiguration();
        accumuloRdfConfiguration.setTablePrefix(RYA_INSTANCE_NAME);
        accumuloRdfConfiguration.setAccumuloUser("root");
        accumuloRdfConfiguration.setAccumuloPassword("secret");
        accumuloRdfConfiguration.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName());
        accumuloRdfConfiguration.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers());
        accumuloRdfConfiguration.setAuths(new String[]{""});
        accumuloRdfConfiguration.set("sc.use_pcj", "true");
        accumuloRdfConfiguration.set("sc.use.updater", "true");
        accumuloRdfConfiguration.set("rya.indexing.pcj.fluo.fluoAppName", super.getFluoConfiguration().getApplicationName());
        accumuloRdfConfiguration.set("rya.indexing.pcj.storageType", PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
        accumuloRdfConfiguration.set("rya.indexing.pcj.updaterType", PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
        accumuloRdfConfiguration.setDisplayQueryPlan(true);
        return accumuloRdfConfiguration;
    }

    protected RyaSailRepository getRyaSailRepository() throws Exception {
        return this.ryaSailRepo;
    }

    protected AccumuloRyaDAO getRyaDAO() {
        return this.dao;
    }

    @After
    public void teardownKafka() {
        if (this.kafkaServer != null) {
            this.kafkaServer.shutdown();
        }
        if (this.zkClient != null) {
            this.zkClient.close();
        }
        if (this.zkServer != null) {
            this.zkServer.shutdown();
        }
    }

    @Test
    public void embeddedKafkaTest() throws Exception {
        AdminUtils.createTopic(this.zkUtils, "testTopic", 1, 1, new Properties(), RackAwareMode$Disabled$.MODULE$);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Properties properties2 = new Properties();
        properties2.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties2.setProperty("group.id", "group0");
        properties2.setProperty("client.id", "consumer0");
        properties2.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties2.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("auto.offset.reset", "earliest");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
        kafkaConsumer.subscribe(Arrays.asList("testTopic"));
        kafkaProducer.send(new ProducerRecord("testTopic", 42, "test-message".getBytes(StandardCharsets.UTF_8)));
        kafkaProducer.close();
        ConsumerRecords poll = kafkaConsumer.poll(3000L);
        Assert.assertEquals(1L, poll.count());
        ConsumerRecord consumerRecord = (ConsumerRecord) poll.iterator().next();
        Assert.assertEquals(42L, ((Integer) consumerRecord.key()).intValue());
        Assert.assertEquals("test-message", new String((byte[]) consumerRecord.value(), StandardCharsets.UTF_8));
        kafkaConsumer.close();
    }

    protected KafkaConsumer<String, VisibilityBindingSet> makeConsumer(String str) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("group.id", "group0");
        properties.setProperty("client.id", "consumer0");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", KryoVisibilityBindingSetSerializer.class.getName());
        properties.put("auto.offset.reset", "earliest");
        KafkaConsumer<String, VisibilityBindingSet> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList(str));
        return kafkaConsumer;
    }

    protected String loadDataAndCreateQuery(String str, Collection<Statement> collection) throws Exception {
        Objects.requireNonNull(str);
        Objects.requireNonNull(collection);
        Instance connector = super.getAccumuloConnector().getInstance();
        String createPCJ = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails("root", "secret".toCharArray(), connector.getInstanceName(), connector.getZooKeepers()), super.getAccumuloConnector()).getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, str, Sets.newHashSet(new CreatePCJ.ExportStrategy[]{CreatePCJ.ExportStrategy.KAFKA}));
        loadData(collection);
        return createPCJ;
    }

    protected void loadData(Collection<Statement> collection) throws Exception {
        Objects.requireNonNull(collection);
        SailRepositoryConnection connection = getRyaSailRepository().getConnection();
        connection.begin();
        connection.add(collection, new Resource[0]);
        connection.commit();
        connection.close();
        super.getMiniFluo().waitForObservers();
    }
}
