package com.lightbend.kafka.scala.streams;

import com.lightbend.kafka.scala.server.KafkaLocalServer;
import com.lightbend.kafka.scala.server.MessageListener;
import com.lightbend.kafka.scala.server.MessageListener$;
import com.lightbend.kafka.scala.server.MessageSender$;
import com.lightbend.kafka.scala.streams.StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro;
import java.lang.Thread;
import java.util.Properties;
import minitest.api.SourceLocation;
import minitest.api.Void;
import minitest.api.Void$;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Seq$;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.math.Ordering$String$;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Random$;

/* compiled from: StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala */
/* loaded from: input_file:com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$$anonfun$6.class */
public final class StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$$anonfun$6 extends AbstractFunction1<KafkaLocalServer, Void> implements Serializable {
    public static final long serialVersionUID = 0;

    public final Void apply(KafkaLocalServer kafkaLocalServer) {
        kafkaLocalServer.createTopic(StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.userClicksTopic());
        kafkaLocalServer.createTopic(StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.userRegionsTopic());
        kafkaLocalServer.createTopic(StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.outputTopic());
        Properties properties = new Properties();
        properties.put("application.id", new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"stream-table-join-scala-integration-test-implicit-serdes-", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(Random$.MODULE$.nextInt(100))})));
        properties.put("client.id", "join-scala-integration-test-implicit-serdes-standard-consumer");
        properties.put("bootstrap.servers", StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.brokers());
        properties.put("commit.interval.ms", "100");
        properties.put("state.dir", StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.localStateDir());
        StreamsBuilderS streamsBuilderS = new StreamsBuilderS(StreamsBuilderS$.MODULE$.$lessinit$greater$default$1());
        streamsBuilderS.stream(StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.userClicksTopic(), ImplicitConversions$.MODULE$.consumedFromSerde(DefaultSerdes$.MODULE$.stringSerde(), StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.userClicksSerde())).leftJoin(streamsBuilderS.table(StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.userRegionsTopic(), ImplicitConversions$.MODULE$.consumedFromSerde(DefaultSerdes$.MODULE$.stringSerde(), DefaultSerdes$.MODULE$.stringSerde())), new StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$$anonfun$6$$anonfun$7(this), ImplicitConversions$.MODULE$.joinedFromKVOSerde(DefaultSerdes$.MODULE$.stringSerde(), StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.userClicksSerde(), DefaultSerdes$.MODULE$.stringSerde())).map(new StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$$anonfun$6$$anonfun$8(this)).groupByKey(ImplicitConversions$.MODULE$.serializedFromSerde(DefaultSerdes$.MODULE$.stringSerde(), DefaultSerdes$.MODULE$.longSerde())).reduce(new StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$$anonfun$6$$anonfun$1(this)).toStream().to(StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.outputTopic(), ImplicitConversions$.MODULE$.producedFromSerde(DefaultSerdes$.MODULE$.stringSerde(), DefaultSerdes$.MODULE$.longSerde()));
        final KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilderS.build(), properties);
        kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(this, kafkaStreams) { // from class: com.lightbend.kafka.scala.streams.StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$$anonfun$6$$anon$9
            private final KafkaStreams streams$1;

            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                try {
                    try {
                        Predef$.MODULE$.println(new Tuple2(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Stream terminated because of uncaught exception .. Shutting "})).s(Nil$.MODULE$)).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"down app"})).s(Nil$.MODULE$)).toString(), th));
                        th.printStackTrace();
                        this.streams$1.close();
                        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Exiting application after streams close (", ")"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxedUnit.UNIT})));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } finally {
                    Predef$.MODULE$.println("Exiting application ..");
                    System.exit(-1);
                }
            }

            {
                this.streams$1 = kafkaStreams;
            }
        });
        kafkaStreams.start();
        StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.userRegions().foreach(new StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$$anonfun$6$$anonfun$apply$3(this, MessageSender$.MODULE$.apply(StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.brokers(), StringSerializer.class.getName(), StringSerializer.class.getName())));
        ((IterableLike) StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.userClicks().map(new StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$$anonfun$6$$anonfun$apply$4(this), Seq$.MODULE$.canBuildFrom())).foreach(new StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$$anonfun$6$$anonfun$apply$7(this, MessageSender$.MODULE$.apply(StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.brokers(), StringSerializer.class.getName(), ByteArraySerializer.class.getName())));
        MessageListener apply = MessageListener$.MODULE$.apply(StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.brokers(), StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.outputTopic(), "join-scala-integration-test-standard-consumer", StringDeserializer.class.getName(), LongDeserializer.class.getName(), new StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.RecordProcessor());
        List waitUntilMinKeyValueRecordsReceived = apply.waitUntilMinKeyValueRecordsReceived(StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.expectedClicksPerRegion().size(), 30000L, apply.waitUntilMinKeyValueRecordsReceived$default$3(), apply.waitUntilMinKeyValueRecordsReceived$default$4());
        kafkaStreams.close();
        Void$ void$ = Void$.MODULE$;
        StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.assertEquals(waitUntilMinKeyValueRecordsReceived.sortBy(new StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$$anonfun$6$$anonfun$apply$8(this), Ordering$String$.MODULE$), StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$.MODULE$.expectedClicksPerRegion().sortBy(new StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro$$anonfun$6$$anonfun$apply$9(this), Ordering$String$.MODULE$), new SourceLocation(Option$.MODULE$.apply("StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala"), Option$.MODULE$.apply("/Users/debasishghosh/lightbend/kafka-streams-scala/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala"), 208));
        return void$.toVoid(BoxedUnit.UNIT, new SourceLocation(Option$.MODULE$.apply("StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala"), Option$.MODULE$.apply("/Users/debasishghosh/lightbend/kafka-streams-scala/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala"), 208));
    }
}
