package org.apache.spark.sql.kafka010;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.kafka010.KafkaContinuousTest;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.test.SharedSparkSessionBase;
import org.apache.spark.sql.test.TestSparkSession;
import org.scalactic.Bool$;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import org.scalatest.Tag;
import scala.Function0;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: KafkaContinuousSourceSuite.scala */
@ScalaSignature(bytes = "\u0006\u0001e1AAA\u0002\u0001\u001d!)a\u0003\u0001C\u0001/\tQ2*\u00194lC\u000e{g\u000e^5ok>,8oU8ve\u000e,7+^5uK*\u0011A!B\u0001\tW\u000647.\u0019\u00192a)\u0011aaB\u0001\u0004gFd'B\u0001\u0005\n\u0003\u0015\u0019\b/\u0019:l\u0015\tQ1\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0019\u0005\u0019qN]4\u0004\u0001M\u0019\u0001aD\n\u0011\u0005A\tR\"A\u0002\n\u0005I\u0019!\u0001F&bM.\f7k\\;sG\u0016\u001cV/\u001b;f\u0005\u0006\u001cX\r\u0005\u0002\u0011)%\u0011Qc\u0001\u0002\u0014\u0017\u000647.Y\"p]RLg.^8vgR+7\u000f^\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003a\u0001\"\u0001\u0005\u0001")
/* loaded from: input_file:org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.class */
public class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase implements KafkaContinuousTest {
    private final Trigger defaultTrigger;
    private final KafkaContinuousTest.TasksEndedListener org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener;

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public /* synthetic */ void org$apache$spark$sql$kafka010$KafkaContinuousTest$$super$beforeEach() {
        SharedSparkSessionBase.beforeEach$(this);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public /* synthetic */ void org$apache$spark$sql$kafka010$KafkaContinuousTest$$super$afterEach() {
        SharedSparkSessionBase.afterEach$(this);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public TestSparkSession createSparkSession() {
        TestSparkSession createSparkSession;
        createSparkSession = createSparkSession();
        return createSparkSession;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void setTopicPartitions(String str, int i, StreamExecution streamExecution) {
        setTopicPartitions(str, i, streamExecution);
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void beforeEach() {
        beforeEach();
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void afterEach() {
        afterEach();
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSourceTest, org.apache.spark.sql.kafka010.KafkaContinuousTest
    public Trigger defaultTrigger() {
        return this.defaultTrigger;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public KafkaContinuousTest.TasksEndedListener org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener() {
        return this.org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public void org$apache$spark$sql$kafka010$KafkaContinuousTest$_setter_$defaultTrigger_$eq(Trigger trigger) {
        this.defaultTrigger = trigger;
    }

    @Override // org.apache.spark.sql.kafka010.KafkaContinuousTest
    public final void org$apache$spark$sql$kafka010$KafkaContinuousTest$_setter_$org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener_$eq(KafkaContinuousTest.TasksEndedListener tasksEndedListener) {
        this.org$apache$spark$sql$kafka010$KafkaContinuousTest$$tasksEndedListener = tasksEndedListener;
    }

    public static final /* synthetic */ int $anonfun$new$4(Tuple2 tuple2) {
        return new StringOps(Predef$.MODULE$.augmentString((String) tuple2._2())).toInt();
    }

    public static final /* synthetic */ RecordMetadata $anonfun$new$5(KafkaProducer kafkaProducer, String str, int i) {
        return (RecordMetadata) kafkaProducer.send(new ProducerRecord(str, BoxesRunTime.boxToInteger(i).toString())).get();
    }

    public static final /* synthetic */ RecordMetadata $anonfun$new$8(KafkaProducer kafkaProducer, String str, int i) {
        return (RecordMetadata) kafkaProducer.send(new ProducerRecord(str, BoxesRunTime.boxToInteger(i).toString())).get();
    }

    public static final /* synthetic */ RecordMetadata $anonfun$new$10(KafkaProducer kafkaProducer, String str, int i) {
        return (RecordMetadata) kafkaProducer.send(new ProducerRecord(str, BoxesRunTime.boxToInteger(i).toString())).get();
    }

    public static final /* synthetic */ void $anonfun$new$3(KafkaContinuousSourceSuite kafkaContinuousSourceSuite, String str, String str2, KafkaProducer kafkaProducer) {
        final KafkaContinuousSourceSuite kafkaContinuousSourceSuite2 = null;
        StreamingQuery start = kafkaContinuousSourceSuite.spark().readStream().format("kafka").option("kafka.bootstrap.servers", kafkaContinuousSourceSuite.testUtils().brokerAddress()).option("kafka.isolation.level", "read_committed").option("startingOffsets", "earliest").option("subscribe", str).load().selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{"CAST(key AS STRING)", "CAST(value AS STRING)"})).as(kafkaContinuousSourceSuite.testImplicits().newProductEncoder(package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(KafkaContinuousSourceSuite.class.getClassLoader()), new TypeCreator(kafkaContinuousSourceSuite2) { // from class: org.apache.spark.sql.kafka010.KafkaContinuousSourceSuite$$typecreator4$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Tuple2"), new $colon.colon(universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$), new $colon.colon(universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$), Nil$.MODULE$)));
            }
        }))).map(tuple2 -> {
            return BoxesRunTime.boxToInteger($anonfun$new$4(tuple2));
        }, kafkaContinuousSourceSuite.testImplicits().newIntEncoder()).writeStream().format("memory").queryName(str2).trigger(new ContinuousTrigger(100L)).start();
        try {
            kafkaProducer.beginTransaction();
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5).foreach(obj -> {
                return $anonfun$new$5(kafkaProducer, str, BoxesRunTime.unboxToInt(obj));
            });
            Dataset table = kafkaContinuousSourceSuite.spark().table(str2);
            kafkaContinuousSourceSuite.assertionsHelper().macroAssert(Bool$.MODULE$.unaryMacroBool(table, "isEmpty", table.isEmpty(), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("KafkaContinuousSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 62));
            kafkaProducer.commitTransaction();
            kafkaContinuousSourceSuite.eventually(kafkaContinuousSourceSuite.timeout(kafkaContinuousSourceSuite.streamingTimeout()), (Function0) () -> {
                kafkaContinuousSourceSuite.checkAnswer(() -> {
                    return kafkaContinuousSourceSuite.spark().table(str2);
                }, kafkaContinuousSourceSuite.testImplicits().localSeqToDatasetHolder(RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5), kafkaContinuousSourceSuite.testImplicits().newIntEncoder()).toDF());
            }, kafkaContinuousSourceSuite.patienceConfig(), new Position("KafkaContinuousSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 66));
            kafkaProducer.beginTransaction();
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(6), 10).foreach(obj2 -> {
                return $anonfun$new$8(kafkaProducer, str, BoxesRunTime.unboxToInt(obj2));
            });
            kafkaProducer.abortTransaction();
            kafkaContinuousSourceSuite.checkAnswer(() -> {
                return kafkaContinuousSourceSuite.spark().table(str2);
            }, kafkaContinuousSourceSuite.testImplicits().localSeqToDatasetHolder(RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5), kafkaContinuousSourceSuite.testImplicits().newIntEncoder()).toDF());
            kafkaProducer.beginTransaction();
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(11), 15).foreach(obj3 -> {
                return $anonfun$new$10(kafkaProducer, str, BoxesRunTime.unboxToInt(obj3));
            });
            kafkaProducer.commitTransaction();
            kafkaContinuousSourceSuite.eventually(kafkaContinuousSourceSuite.timeout(kafkaContinuousSourceSuite.streamingTimeout()), (Function0) () -> {
                kafkaContinuousSourceSuite.checkAnswer(() -> {
                    return kafkaContinuousSourceSuite.spark().table(str2);
                }, kafkaContinuousSourceSuite.testImplicits().localSeqToDatasetHolder((Seq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5).$plus$plus(RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(11), 15), IndexedSeq$.MODULE$.canBuildFrom()), kafkaContinuousSourceSuite.testImplicits().newIntEncoder()).toDF());
            }, kafkaContinuousSourceSuite.patienceConfig(), new Position("KafkaContinuousSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 86));
        } finally {
            start.stop();
        }
    }

    public static final /* synthetic */ int $anonfun$new$16(Tuple2 tuple2) {
        return new StringOps(Predef$.MODULE$.augmentString((String) tuple2._2())).toInt();
    }

    public static final /* synthetic */ RecordMetadata $anonfun$new$17(KafkaProducer kafkaProducer, String str, int i) {
        return (RecordMetadata) kafkaProducer.send(new ProducerRecord(str, BoxesRunTime.boxToInteger(i).toString())).get();
    }

    public static final /* synthetic */ RecordMetadata $anonfun$new$22(KafkaProducer kafkaProducer, String str, int i) {
        return (RecordMetadata) kafkaProducer.send(new ProducerRecord(str, BoxesRunTime.boxToInteger(i).toString())).get();
    }

    public static final /* synthetic */ RecordMetadata $anonfun$new$25(KafkaProducer kafkaProducer, String str, int i) {
        return (RecordMetadata) kafkaProducer.send(new ProducerRecord(str, BoxesRunTime.boxToInteger(i).toString())).get();
    }

    public static final /* synthetic */ void $anonfun$new$15(KafkaContinuousSourceSuite kafkaContinuousSourceSuite, String str, String str2, KafkaProducer kafkaProducer) {
        final KafkaContinuousSourceSuite kafkaContinuousSourceSuite2 = null;
        StreamingQuery start = kafkaContinuousSourceSuite.spark().readStream().format("kafka").option("kafka.bootstrap.servers", kafkaContinuousSourceSuite.testUtils().brokerAddress()).option("kafka.isolation.level", "read_uncommitted").option("startingOffsets", "earliest").option("subscribe", str).load().selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{"CAST(key AS STRING)", "CAST(value AS STRING)"})).as(kafkaContinuousSourceSuite.testImplicits().newProductEncoder(package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(KafkaContinuousSourceSuite.class.getClassLoader()), new TypeCreator(kafkaContinuousSourceSuite2) { // from class: org.apache.spark.sql.kafka010.KafkaContinuousSourceSuite$$typecreator5$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Tuple2"), new $colon.colon(universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$), new $colon.colon(universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$), Nil$.MODULE$)));
            }
        }))).map(tuple2 -> {
            return BoxesRunTime.boxToInteger($anonfun$new$16(tuple2));
        }, kafkaContinuousSourceSuite.testImplicits().newIntEncoder()).writeStream().format("memory").queryName(str2).trigger(new ContinuousTrigger(100L)).start();
        try {
            kafkaProducer.beginTransaction();
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5).foreach(obj -> {
                return $anonfun$new$17(kafkaProducer, str, BoxesRunTime.unboxToInt(obj));
            });
            kafkaContinuousSourceSuite.eventually(kafkaContinuousSourceSuite.timeout(kafkaContinuousSourceSuite.streamingTimeout()), (Function0) () -> {
                kafkaContinuousSourceSuite.checkAnswer(() -> {
                    return kafkaContinuousSourceSuite.spark().table(str2);
                }, kafkaContinuousSourceSuite.testImplicits().localSeqToDatasetHolder(RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5), kafkaContinuousSourceSuite.testImplicits().newIntEncoder()).toDF());
            }, kafkaContinuousSourceSuite.patienceConfig(), new Position("KafkaContinuousSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 127));
            kafkaProducer.commitTransaction();
            kafkaContinuousSourceSuite.eventually(kafkaContinuousSourceSuite.timeout(kafkaContinuousSourceSuite.streamingTimeout()), (Function0) () -> {
                kafkaContinuousSourceSuite.checkAnswer(() -> {
                    return kafkaContinuousSourceSuite.spark().table(str2);
                }, kafkaContinuousSourceSuite.testImplicits().localSeqToDatasetHolder(RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5), kafkaContinuousSourceSuite.testImplicits().newIntEncoder()).toDF());
            }, kafkaContinuousSourceSuite.patienceConfig(), new Position("KafkaContinuousSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 134));
            kafkaProducer.beginTransaction();
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(6), 10).foreach(obj2 -> {
                return $anonfun$new$22(kafkaProducer, str, BoxesRunTime.unboxToInt(obj2));
            });
            kafkaProducer.abortTransaction();
            kafkaContinuousSourceSuite.eventually(kafkaContinuousSourceSuite.timeout(kafkaContinuousSourceSuite.streamingTimeout()), (Function0) () -> {
                kafkaContinuousSourceSuite.checkAnswer(() -> {
                    return kafkaContinuousSourceSuite.spark().table(str2);
                }, kafkaContinuousSourceSuite.testImplicits().localSeqToDatasetHolder(RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 10), kafkaContinuousSourceSuite.testImplicits().newIntEncoder()).toDF());
            }, kafkaContinuousSourceSuite.patienceConfig(), new Position("KafkaContinuousSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 145));
            kafkaProducer.beginTransaction();
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(11), 15).foreach(obj3 -> {
                return $anonfun$new$25(kafkaProducer, str, BoxesRunTime.unboxToInt(obj3));
            });
            kafkaContinuousSourceSuite.eventually(kafkaContinuousSourceSuite.timeout(kafkaContinuousSourceSuite.streamingTimeout()), (Function0) () -> {
                kafkaContinuousSourceSuite.checkAnswer(() -> {
                    return kafkaContinuousSourceSuite.spark().table(str2);
                }, kafkaContinuousSourceSuite.testImplicits().localSeqToDatasetHolder(RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 15), kafkaContinuousSourceSuite.testImplicits().newIntEncoder()).toDF());
            }, kafkaContinuousSourceSuite.patienceConfig(), new Position("KafkaContinuousSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 155));
            kafkaProducer.commitTransaction();
            kafkaContinuousSourceSuite.eventually(kafkaContinuousSourceSuite.timeout(kafkaContinuousSourceSuite.streamingTimeout()), (Function0) () -> {
                kafkaContinuousSourceSuite.checkAnswer(() -> {
                    return kafkaContinuousSourceSuite.spark().table(str2);
                }, kafkaContinuousSourceSuite.testImplicits().localSeqToDatasetHolder(RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 15), kafkaContinuousSourceSuite.testImplicits().newIntEncoder()).toDF());
            }, kafkaContinuousSourceSuite.patienceConfig(), new Position("KafkaContinuousSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 162));
        } finally {
            start.stop();
        }
    }

    public KafkaContinuousSourceSuite() {
        KafkaContinuousTest.$init$(this);
        test("read Kafka transactional messages: read_committed", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            String str = "kafka_continuous_source_test";
            this.withTable(Predef$.MODULE$.wrapRefArray(new String[]{"kafka_continuous_source_test"}), () -> {
                String newTopic = this.newTopic();
                this.testUtils().createTopic(newTopic);
                this.testUtils().withTransactionalProducer(kafkaProducer -> {
                    $anonfun$new$3(this, newTopic, str, kafkaProducer);
                    return BoxedUnit.UNIT;
                });
            });
        }, new Position("KafkaContinuousSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 31));
        test("read Kafka transactional messages: read_uncommitted", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            String str = "kafka_continuous_source_test";
            this.withTable(Predef$.MODULE$.wrapRefArray(new String[]{"kafka_continuous_source_test"}), () -> {
                String newTopic = this.newTopic();
                this.testUtils().createTopic(newTopic);
                this.testUtils().withTransactionalProducer(kafkaProducer -> {
                    $anonfun$new$15(this, newTopic, str, kafkaProducer);
                    return BoxedUnit.UNIT;
                });
            });
        }, new Position("KafkaContinuousSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 97));
        test("SPARK-27494: read kafka record containing null key/values.", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.testNullableKeyValue(new ContinuousTrigger(100L));
        }, new Position("KafkaContinuousSourceSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 173));
    }
}
