package org.apache.rya.indexing.pcj.fluo.integration;

import com.google.common.collect.Sets;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import javax.xml.datatype.DatatypeFactory;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.scanner.ColumnScanner;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Span;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase;
import org.apache.rya.periodic.notification.notification.CommandNotification;
import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.class */
public class CreateDeletePeriodicPCJ extends KafkaExportITBase {
    @Test
    public void deletePeriodicPCJ() throws Exception {
        SimpleValueFactory simpleValueFactory = SimpleValueFactory.getInstance();
        DatatypeFactory newInstance = DatatypeFactory.newInstance();
        ZonedDateTime minusMinutes = ZonedDateTime.now().minusMinutes(30L);
        String format = minusMinutes.format(DateTimeFormatter.ISO_INSTANT);
        ZonedDateTime minusMinutes2 = minusMinutes.minusMinutes(30L);
        String format2 = minusMinutes2.format(DateTimeFormatter.ISO_INSTANT);
        ZonedDateTime minusMinutes3 = minusMinutes2.minusMinutes(30L);
        runTest("prefix function: <http://org.apache.rya/function#> prefix time: <http://www.w3.org/2006/time#> select (count(?obs) as ?total) where {Filter(function:periodic(?time, 2, .5, time:hours)) ?obs <uri:hasTime> ?time. ?obs <uri:hasId> ?id }", Sets.newHashSet(new Statement[]{simpleValueFactory.createStatement(simpleValueFactory.createIRI("urn:obs_1"), simpleValueFactory.createIRI("uri:hasTime"), simpleValueFactory.createLiteral(newInstance.newXMLGregorianCalendar(format))), simpleValueFactory.createStatement(simpleValueFactory.createIRI("urn:obs_1"), simpleValueFactory.createIRI("uri:hasId"), simpleValueFactory.createLiteral("id_1")), simpleValueFactory.createStatement(simpleValueFactory.createIRI("urn:obs_2"), simpleValueFactory.createIRI("uri:hasTime"), simpleValueFactory.createLiteral(newInstance.newXMLGregorianCalendar(format2))), simpleValueFactory.createStatement(simpleValueFactory.createIRI("urn:obs_2"), simpleValueFactory.createIRI("uri:hasId"), simpleValueFactory.createLiteral("id_2")), simpleValueFactory.createStatement(simpleValueFactory.createIRI("urn:obs_3"), simpleValueFactory.createIRI("uri:hasTime"), simpleValueFactory.createLiteral(newInstance.newXMLGregorianCalendar(minusMinutes3.format(DateTimeFormatter.ISO_INSTANT)))), simpleValueFactory.createStatement(simpleValueFactory.createIRI("urn:obs_3"), simpleValueFactory.createIRI("uri:hasId"), simpleValueFactory.createLiteral("id_3")), simpleValueFactory.createStatement(simpleValueFactory.createIRI("urn:obs_4"), simpleValueFactory.createIRI("uri:hasTime"), simpleValueFactory.createLiteral(newInstance.newXMLGregorianCalendar(minusMinutes3.minusMinutes(30L).format(DateTimeFormatter.ISO_INSTANT)))), simpleValueFactory.createStatement(simpleValueFactory.createIRI("urn:obs_4"), simpleValueFactory.createIRI("uri:hasId"), simpleValueFactory.createLiteral("id_4"))}), 30);
    }

    private void runTest(String str, Collection<Statement> collection, int i) throws Exception {
        FluoClient newClient = FluoFactory.newClient(super.getFluoConfiguration());
        Throwable th = null;
        try {
            AccumuloPeriodicQueryResultStorage accumuloPeriodicQueryResultStorage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(), "test_");
            KafkaNotificationRegistrationClient kafkaNotificationRegistrationClient = new KafkaNotificationRegistrationClient("notification_topic", getNotificationProducer("localhost:9092"));
            String queryId = new CreatePeriodicQuery(newClient, accumuloPeriodicQueryResultStorage).createPeriodicQuery(str, kafkaNotificationRegistrationClient).getQueryId();
            loadData(collection);
            Assert.assertEquals(i, getFluoTableEntries(newClient).size());
            new DeletePeriodicQuery(newClient, accumuloPeriodicQueryResultStorage).deletePeriodicQuery(FluoQueryUtils.convertFluoQueryIdToPcjId(queryId), kafkaNotificationRegistrationClient);
            getMiniFluo().waitForObservers();
            Assert.assertEquals(1L, getFluoTableEntries(newClient).size());
            KafkaConsumer<String, CommandNotification> makeNotificationConsumer = makeNotificationConsumer("notification_topic");
            Throwable th2 = null;
            try {
                Set<CommandNotification> kafkaNotifications = getKafkaNotifications("notification_topic", 7000, makeNotificationConsumer);
                if (makeNotificationConsumer != null) {
                    if (0 != 0) {
                        try {
                            makeNotificationConsumer.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        makeNotificationConsumer.close();
                    }
                }
                Assert.assertEquals(2L, kafkaNotifications.size());
                String str2 = "";
                boolean z = false;
                boolean z2 = false;
                for (CommandNotification commandNotification : kafkaNotifications) {
                    if (str2.length() == 0) {
                        str2 = commandNotification.getId();
                    } else {
                        Assert.assertEquals(str2, commandNotification.getId());
                    }
                    if (commandNotification.getCommand() == CommandNotification.Command.ADD) {
                        z = true;
                    }
                    if (commandNotification.getCommand() == CommandNotification.Command.DELETE) {
                        z2 = true;
                    }
                }
                Assert.assertEquals(true, Boolean.valueOf(z));
                Assert.assertEquals(true, Boolean.valueOf(z2));
                if (newClient != null) {
                    if (0 == 0) {
                        newClient.close();
                        return;
                    }
                    try {
                        newClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            } catch (Throwable th5) {
                if (makeNotificationConsumer != null) {
                    if (0 != 0) {
                        try {
                            makeNotificationConsumer.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        makeNotificationConsumer.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (newClient != null) {
                if (0 != 0) {
                    try {
                        newClient.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    newClient.close();
                }
            }
            throw th7;
        }
    }

    private List<Bytes> getFluoTableEntries(FluoClient fluoClient) {
        Snapshot newSnapshot = fluoClient.newSnapshot();
        Throwable th = null;
        try {
            ArrayList arrayList = new ArrayList();
            Iterator it = newSnapshot.scanner().over(Span.prefix("")).byRow().build().iterator();
            while (it.hasNext()) {
                arrayList.add(((ColumnScanner) it.next()).getRow());
            }
            return arrayList;
        } finally {
            if (newSnapshot != null) {
                if (0 != 0) {
                    try {
                        newSnapshot.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    newSnapshot.close();
                }
            }
        }
    }

    private KafkaProducer<String, CommandNotification> getNotificationProducer(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", CommandNotificationSerializer.class.getName());
        return new KafkaProducer<>(properties);
    }

    private KafkaConsumer<String, CommandNotification> makeNotificationConsumer(String str) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "group0");
        properties.setProperty("client.id", "consumer0");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", CommandNotificationSerializer.class.getName());
        properties.put("auto.offset.reset", "earliest");
        KafkaConsumer<String, CommandNotification> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList(str));
        return kafkaConsumer;
    }

    private Set<CommandNotification> getKafkaNotifications(String str, int i, KafkaConsumer<String, CommandNotification> kafkaConsumer) {
        Objects.requireNonNull(str);
        HashSet hashSet = new HashSet();
        Iterator it = kafkaConsumer.poll(i).iterator();
        while (it.hasNext()) {
            hashSet.add(((ConsumerRecord) it.next()).value());
        }
        return hashSet;
    }
}
