package org.apache.spark.sql.kafka010;

import java.util.HashMap;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.AttributeReference$;
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection$;
import org.apache.spark.sql.execution.streaming.MemoryStreamBase;
import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream$;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.test.TestSparkSession;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Metadata;
import org.scalactic.source.Position;
import org.scalatest.Tag;
import org.scalatest.time.SpanSugar$;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: KafkaSinkSuite.scala */
@ScalaSignature(bytes = "\u0006\u0001Q3AAB\u0004\u0001%!)q\u0003\u0001C\u00011!)!\u0004\u0001C)7!)!\u0005\u0001C)G!)\u0011\b\u0001C)u!)A\n\u0001C)\u001b\nA2*\u00194lC\u000e{g\u000e^5ok>,8oU5oWN+\u0018\u000e^3\u000b\u0005!I\u0011\u0001C6bM.\f\u0007'\r\u0019\u000b\u0005)Y\u0011aA:rY*\u0011A\"D\u0001\u0006gB\f'o\u001b\u0006\u0003\u001d=\ta!\u00199bG\",'\"\u0001\t\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u0001\u0019\u0002C\u0001\u000b\u0016\u001b\u00059\u0011B\u0001\f\b\u0005mY\u0015MZ6b'&t7n\u0015;sK\u0006l\u0017N\\4Tk&$XMQ1tK\u00061A(\u001b8jiz\"\u0012!\u0007\t\u0003)\u0001\t!c\u0019:fCR,7\u000b]1sWN+7o]5p]V\tA\u0004\u0005\u0002\u001eA5\taD\u0003\u0002 \u0013\u0005!A/Z:u\u0013\t\tcD\u0001\tUKN$8\u000b]1sWN+7o]5p]\u0006\u00112M]3bi\u0016lU-\\8ssN#(/Z1n)\u0005!\u0003cA\u0013+Y5\taE\u0003\u0002(Q\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003S%\t\u0011\"\u001a=fGV$\u0018n\u001c8\n\u0005-2#\u0001E'f[>\u0014\u0018p\u0015;sK\u0006l')Y:f!\ticG\u0004\u0002/iA\u0011qFM\u0007\u0002a)\u0011\u0011'E\u0001\u0007yI|w\u000e\u001e \u000b\u0003M\nQa]2bY\u0006L!!\u000e\u001a\u0002\rA\u0013X\rZ3g\u0013\t9\u0004H\u0001\u0004TiJLgn\u001a\u0006\u0003kI\nAB^3sS\u001aL(+Z:vYR$\"aO#\u0015\u0005q\u0002\u0005CA\u001f?\u001b\u0005\u0011\u0014BA 3\u0005\u0011)f.\u001b;\t\r\u0005#A\u00111\u0001C\u0003!1XM]5gs\u001as\u0007cA\u001fDy%\u0011AI\r\u0002\ty\tLh.Y7f}!)a\t\u0002a\u0001\u000f\u00061qO]5uKJ\u0004\"\u0001\u0013&\u000e\u0003%S!aJ\u0005\n\u0005-K%AD*ue\u0016\fW.\u001b8h#V,'/_\u0001\u000fI\u00164\u0017-\u001e7u)JLwmZ3s+\u0005q\u0005cA\u001fP#&\u0011\u0001K\r\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0005!\u0013\u0016BA*J\u0005\u001d!&/[4hKJ\u0004")
/* loaded from: input_file:org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.class */
public class KafkaContinuousSinkSuite extends KafkaSinkStreamingSuiteBase {
    @Override // org.apache.spark.sql.kafka010.KafkaSinkSuiteBase
    public TestSparkSession createSparkSession() {
        return new TestSparkSession(new SparkContext("local[10]", "continuous-stream-test-sql-context", sparkConf().set("spark.sql.testkey", "true")));
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSinkStreamingSuiteBase
    public MemoryStreamBase<String> createMemoryStream() {
        return ContinuousMemoryStream$.MODULE$.singlePartition(testImplicits().newStringEncoder(), sqlContext());
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSinkStreamingSuiteBase
    public void verifyResult(StreamingQuery streamingQuery, Function0<BoxedUnit> function0) {
        eventually(timeout(streamingTimeout()), interval(SpanSugar$.MODULE$.convertIntToGrainOfTime(5).seconds()), function0, new Position("KafkaSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 340));
    }

    @Override // org.apache.spark.sql.kafka010.KafkaSinkStreamingSuiteBase
    public Option<Trigger> defaultTrigger() {
        return new Some(Trigger.Continuous(1000L));
    }

    public KafkaContinuousSinkSuite() {
        test("generic - write big data with small producer buffer", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            String newTopic = this.newTopic();
            KafkaTestUtils testUtils = this.testUtils();
            testUtils.createTopic(newTopic, 1, testUtils.createTopic$default$3());
            HashMap hashMap = new HashMap();
            hashMap.put("bootstrap.servers", this.testUtils().brokerAddress());
            hashMap.put("buffer.memory", "16384");
            hashMap.put("block.on.buffer.full", "true");
            hashMap.put("key.serializer", ByteArraySerializer.class.getName());
            hashMap.put("value.serializer", ByteArraySerializer.class.getName());
            Seq$ seq$ = Seq$.MODULE$;
            Predef$ predef$ = Predef$.MODULE$;
            BinaryType$ binaryType$ = BinaryType$.MODULE$;
            boolean apply$default$3 = AttributeReference$.MODULE$.apply$default$3();
            Metadata apply$default$4 = AttributeReference$.MODULE$.apply$default$4();
            Seq apply = seq$.apply(predef$.wrapRefArray(new AttributeReference[]{new AttributeReference("value", binaryType$, apply$default$3, apply$default$4, AttributeReference$.MODULE$.apply$default$5("value", binaryType$, apply$default$3, apply$default$4), AttributeReference$.MODULE$.apply$default$6("value", binaryType$, apply$default$3, apply$default$4))}));
            byte[] bArr = new byte[15000];
            KafkaDataWriter kafkaDataWriter = new KafkaDataWriter(new Some(newTopic), hashMap, apply);
            try {
                DataType[] dataTypeArr = {BinaryType$.MODULE$};
                UnsafeProjection create = UnsafeProjection$.MODULE$.create(dataTypeArr);
                SpecificInternalRow specificInternalRow = new SpecificInternalRow(Predef$.MODULE$.wrapRefArray(dataTypeArr));
                specificInternalRow.update(0, bArr);
                Seq$.MODULE$.fill(1000, () -> {
                    return create.apply(specificInternalRow);
                }).iterator().foreach(unsafeRow -> {
                    kafkaDataWriter.write(unsafeRow);
                    return BoxedUnit.UNIT;
                });
                return kafkaDataWriter.commit();
            } finally {
                kafkaDataWriter.close();
            }
        }, new Position("KafkaSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 347));
    }
}
