/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rya.streams.kafka;

import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.apache.rya.test.kafka.KafkaTestUtil;
import org.junit.Assert;

@DefaultAnnotation(value={NonNull.class})
public class RyaStreamsTestUtil {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static <T> void runStreamProcessingTest(KafkaTestInstanceRule kafka, String statementsTopic, String resultsTopic, TopologyBuilder builder, List<VisibilityStatement> statements, Set<T> expected, Class<? extends Deserializer<T>> expectedDeserializerClass) throws Exception {
        Objects.requireNonNull(kafka);
        Objects.requireNonNull(statementsTopic);
        Objects.requireNonNull(resultsTopic);
        Objects.requireNonNull(builder);
        Objects.requireNonNull(statements);
        Objects.requireNonNull(expected);
        Objects.requireNonNull(expectedDeserializerClass);
        kafka.createTopic(statementsTopic);
        kafka.createTopic(resultsTopic);
        Properties props = kafka.createBootstrapServerConfig();
        props.put("application.id", UUID.randomUUID().toString());
        props.put("auto.offset.reset", "earliest");
        streams.cleanUp();
        try (KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig((Map)props));){
            streams.start();
            Thread.sleep(6000L);
            try (Producer producer = KafkaTestUtil.makeProducer((KafkaTestInstanceRule)kafka, StringSerializer.class, VisibilityStatementSerializer.class);){
                new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements);
            }
            var10_10 = null;
            try (Consumer consumer = KafkaTestUtil.fromStartConsumer((KafkaTestInstanceRule)kafka, StringDeserializer.class, expectedDeserializerClass);){
                consumer.subscribe(Arrays.asList(resultsTopic));
                HashSet results = Sets.newHashSet((Iterable)KafkaTestUtil.pollForResults((int)500, (int)6, (int)expected.size(), (Consumer)consumer));
                Assert.assertEquals(expected, (Object)results);
            }
            catch (Throwable throwable) {
                var10_10 = throwable;
                throw throwable;
            }
        }
    }
}

