package org.apache.spark.streaming.flume;

import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Context;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
import org.apache.spark.streaming.flume.sink.SparkSink;
import org.apache.spark.streaming.flume.sink.SparkSinkConfig$;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;

/* compiled from: PollingFlumeTestUtils.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%c!B\u0001\u0003\u0001\ta!!\u0006)pY2Lgn\u001a$mk6,G+Z:u+RLGn\u001d\u0006\u0003\u0007\u0011\tQA\u001a7v[\u0016T!!\u0002\u0004\u0002\u0013M$(/Z1nS:<'BA\u0004\t\u0003\u0015\u0019\b/\u0019:l\u0015\tI!\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0017\u0005\u0019qN]4\u0014\u0005\u0001i\u0001C\u0001\b\u0012\u001b\u0005y!\"\u0001\t\u0002\u000bM\u001c\u0017\r\\1\n\u0005Iy!AB!osJ+g\rC\u0003\u0015\u0001\u0011\u0005a#\u0001\u0004=S:LGOP\u0002\u0001)\u00059\u0002C\u0001\r\u0001\u001b\u0005\u0011\u0001b\u0002\u000e\u0001\u0005\u0004%IaG\u0001\u000bE\u0006$8\r[\"pk:$X#\u0001\u000f\u0011\u00059i\u0012B\u0001\u0010\u0010\u0005\rIe\u000e\u001e\u0005\u0007A\u0001\u0001\u000b\u0011\u0002\u000f\u0002\u0017\t\fGo\u00195D_VtG\u000f\t\u0005\bE\u0001\u0011\r\u0011\"\u0001\u001c\u00039)g/\u001a8ugB+'OQ1uG\"Da\u0001\n\u0001!\u0002\u0013a\u0012aD3wK:$8\u000fU3s\u0005\u0006$8\r\u001b\u0011\t\u000f\u0019\u0002!\u0019!C\u00057\u0005)Bo\u001c;bY\u00163XM\u001c;t!\u0016\u00148\t[1o]\u0016d\u0007B\u0002\u0015\u0001A\u0003%A$\u0001\fu_R\fG.\u0012<f]R\u001c\b+\u001a:DQ\u0006tg.\u001a7!\u0011\u001dQ\u0003A1A\u0005\nm\tqb\u00195b]:,GnQ1qC\u000eLG/\u001f\u0005\u0007Y\u0001\u0001\u000b\u0011\u0002\u000f\u0002!\rD\u0017M\u001c8fY\u000e\u000b\u0007/Y2jif\u0004\u0003\"\u0002\u0018\u0001\t\u0003Y\u0012AD4fiR{G/\u00197Fm\u0016tGo\u001d\u0005\ba\u0001\u0011\r\u0011\"\u00032\u0003!\u0019\u0007.\u00198oK2\u001cX#\u0001\u001a\u0011\u0007MB$(D\u00015\u0015\t)d'A\u0004nkR\f'\r\\3\u000b\u0005]z\u0011AC2pY2,7\r^5p]&\u0011\u0011\b\u000e\u0002\f\u0003J\u0014\u0018-\u001f\"vM\u001a,'\u000f\u0005\u0002<\u007f5\tAH\u0003\u0002>}\u000591\r[1o]\u0016d'BA\u0002\t\u0013\t\u0001EHA\u0007NK6|'/_\"iC:tW\r\u001c\u0005\u0007\u0005\u0002\u0001\u000b\u0011\u0002\u001a\u0002\u0013\rD\u0017M\u001c8fYN\u0004\u0003b\u0002#\u0001\u0005\u0004%I!R\u0001\u0006g&t7n]\u000b\u0002\rB\u00191\u0007O$\u0011\u0005![U\"A%\u000b\u0005)\u0013\u0011\u0001B:j].L!\u0001T%\u0003\u0013M\u0003\u0018M]6TS:\\\u0007B\u0002(\u0001A\u0003%a)\u0001\u0004tS:\\7\u000f\t\u0005\u0006!\u0002!\t!U\u0001\u0010gR\f'\u000f^*j]\u001edWmU5oWR\tA\u0004C\u0003T\u0001\u0011\u0005A+\u0001\nti\u0006\u0014H/T;mi&\u0004H.Z*j].\u001cH#A+\u0011\u0007YsFD\u0004\u0002X9:\u0011\u0001lW\u0007\u00023*\u0011!,F\u0001\u0007yI|w\u000e\u001e \n\u0003AI!!X\b\u0002\u000fA\f7m[1hK&\u0011q\f\u0019\u0002\u0004'\u0016\f(BA/\u0010\u0011\u0015\u0011\u0007\u0001\"\u0001d\u0003\u001d\u001aXM\u001c3ECR\f\u0017I\u001c3F]N,(/Z!mY\u0012\u000bG/\u0019%bg\n+WM\u001c*fG\u0016Lg/\u001a3\u0015\u0003\u0011\u0004\"AD3\n\u0005\u0019|!\u0001B+oSRDQ\u0001\u001b\u0001\u0005\u0002%\fA\"Y:tKJ$x*\u001e;qkR$2\u0001\u001a6\u007f\u0011\u0015Yw\r1\u0001m\u00035yW\u000f\u001e9vi\"+\u0017\rZ3sgB\u0019QN\u001d;\u000e\u00039T!a\u001c9\u0002\tU$\u0018\u000e\u001c\u0006\u0002c\u0006!!.\u0019<b\u0013\t\u0019hN\u0001\u0003MSN$\b\u0003B7vo^L!A\u001e8\u0003\u00075\u000b\u0007\u000f\u0005\u0002yw:\u0011a\"_\u0005\u0003u>\ta\u0001\u0015:fI\u00164\u0017B\u0001?~\u0005\u0019\u0019FO]5oO*\u0011!p\u0004\u0005\u0007\u007f\u001e\u0004\r!!\u0001\u0002\u0019=,H\u000f];u\u0005>$\u0017.Z:\u0011\u00075\u0014x\u000f\u0003\u0004\u0002\u0006\u0001!\taY\u0001\u0017CN\u001cXM\u001d;DQ\u0006tg.\u001a7t\u0003J,W)\u001c9us\"9\u0011\u0011\u0002\u0001\u0005\n\u0005-\u0011\u0001F1tg\u0016\u0014Ho\u00115b]:,G.S:F[B$\u0018\u0010F\u0002e\u0003\u001bAa!PA\u0004\u0001\u0004Q\u0004BBA\t\u0001\u0011\u00051-A\u0003dY>\u001cXM\u0002\u0004\u0002\u0016\u0001!\u0011q\u0003\u0002\r)bt7+\u001e2nSR$XM]\n\u0007\u0003'\tI\"!\n\u0011\t\u0005m\u0011\u0011E\u0007\u0003\u0003;Q1!a\bq\u0003\u0011a\u0017M\\4\n\t\u0005\r\u0012Q\u0004\u0002\u0007\u001f\nTWm\u0019;\u0011\r\u0005\u001d\u0012QFA\u0019\u001b\t\tICC\u0002\u0002,9\f!bY8oGV\u0014(/\u001a8u\u0013\u0011\ty#!\u000b\u0003\u0011\r\u000bG\u000e\\1cY\u0016\u0004B!a\u0007\u00024%!\u0011QGA\u000f\u0005\u00111v.\u001b3\t\u0013u\n\u0019B!A!\u0002\u0013Q\u0004b\u0002\u000b\u0002\u0014\u0011\u0005\u00111\b\u000b\u0005\u0003{\t\t\u0005\u0005\u0003\u0002@\u0005MQ\"\u0001\u0001\t\ru\nI\u00041\u0001;\u0011!\t)%a\u0005\u0005B\u0005\u001d\u0013\u0001B2bY2$\"!!\r")
/* loaded from: input_file:org/apache/spark/streaming/flume/PollingFlumeTestUtils.class */
public class PollingFlumeTestUtils {
    private final int org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount = 5;
    private final int eventsPerBatch = 100;
    private final int org$apache$spark$streaming$flume$PollingFlumeTestUtils$$totalEventsPerChannel = org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount() * eventsPerBatch();
    private final int channelCapacity = SpoolDirectorySourceConfigurationConstants.DEFAULT_BUFFER_MAX_LINE_LENGTH;
    private final ArrayBuffer<MemoryChannel> org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels = new ArrayBuffer<>();
    private final ArrayBuffer<SparkSink> sinks = new ArrayBuffer<>();

    /* compiled from: PollingFlumeTestUtils.scala */
    /* loaded from: input_file:org/apache/spark/streaming/flume/PollingFlumeTestUtils$TxnSubmitter.class */
    public class TxnSubmitter implements Callable<Void> {
        public final MemoryChannel org$apache$spark$streaming$flume$PollingFlumeTestUtils$TxnSubmitter$$channel;
        public final /* synthetic */ PollingFlumeTestUtils $outer;

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() {
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), org$apache$spark$streaming$flume$PollingFlumeTestUtils$TxnSubmitter$$$outer().org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount()).foreach$mVc$sp(new PollingFlumeTestUtils$TxnSubmitter$$anonfun$call$1(this, new IntRef(0)));
            return null;
        }

        public /* synthetic */ PollingFlumeTestUtils org$apache$spark$streaming$flume$PollingFlumeTestUtils$TxnSubmitter$$$outer() {
            return this.$outer;
        }

        public TxnSubmitter(PollingFlumeTestUtils pollingFlumeTestUtils, MemoryChannel memoryChannel) {
            this.org$apache$spark$streaming$flume$PollingFlumeTestUtils$TxnSubmitter$$channel = memoryChannel;
            if (pollingFlumeTestUtils == null) {
                throw new NullPointerException();
            }
            this.$outer = pollingFlumeTestUtils;
        }
    }

    public int org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount() {
        return this.org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount;
    }

    public int eventsPerBatch() {
        return this.eventsPerBatch;
    }

    public int org$apache$spark$streaming$flume$PollingFlumeTestUtils$$totalEventsPerChannel() {
        return this.org$apache$spark$streaming$flume$PollingFlumeTestUtils$$totalEventsPerChannel;
    }

    private int channelCapacity() {
        return this.channelCapacity;
    }

    public int getTotalEvents() {
        return org$apache$spark$streaming$flume$PollingFlumeTestUtils$$totalEventsPerChannel() * org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().size();
    }

    public ArrayBuffer<MemoryChannel> org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels() {
        return this.org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels;
    }

    private ArrayBuffer<SparkSink> sinks() {
        return this.sinks;
    }

    public int startSingleSink() {
        org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().clear();
        sinks().clear();
        Context context = new Context();
        context.put("capacity", BoxesRunTime.boxToInteger(channelCapacity()).toString());
        context.put("transactionCapacity", "1000");
        context.put("keep-alive", "0");
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, context);
        SparkSink sparkSink = new SparkSink();
        context.put(SparkSinkConfig$.MODULE$.CONF_HOSTNAME(), "localhost");
        context.put(SparkSinkConfig$.MODULE$.CONF_PORT(), String.valueOf(0));
        Configurables.configure(sparkSink, context);
        sparkSink.setChannel(memoryChannel);
        sparkSink.start();
        org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().$plus$eq(memoryChannel);
        sinks().$plus$eq(sparkSink);
        return sparkSink.getPort();
    }

    public Seq<Object> startMultipleSinks() {
        org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().clear();
        sinks().clear();
        Context context = new Context();
        context.put("capacity", BoxesRunTime.boxToInteger(channelCapacity()).toString());
        context.put("transactionCapacity", "1000");
        context.put("keep-alive", "0");
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, context);
        MemoryChannel memoryChannel2 = new MemoryChannel();
        Configurables.configure(memoryChannel2, context);
        SparkSink sparkSink = new SparkSink();
        context.put(SparkSinkConfig$.MODULE$.CONF_HOSTNAME(), "localhost");
        context.put(SparkSinkConfig$.MODULE$.CONF_PORT(), String.valueOf(0));
        Configurables.configure(sparkSink, context);
        sparkSink.setChannel(memoryChannel);
        sparkSink.start();
        SparkSink sparkSink2 = new SparkSink();
        context.put(SparkSinkConfig$.MODULE$.CONF_HOSTNAME(), "localhost");
        context.put(SparkSinkConfig$.MODULE$.CONF_PORT(), String.valueOf(0));
        Configurables.configure(sparkSink2, context);
        sparkSink2.setChannel(memoryChannel2);
        sparkSink2.start();
        sinks().$plus$eq(sparkSink);
        sinks().$plus$eq(sparkSink2);
        org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().$plus$eq(memoryChannel);
        org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().$plus$eq(memoryChannel2);
        return (Seq) sinks().map(new PollingFlumeTestUtils$$anonfun$startMultipleSinks$1(this), ArrayBuffer$.MODULE$.canBuildFrom());
    }

    public void sendDataAndEnsureAllDataHasBeenReceived() {
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(Executors.newCachedThreadPool());
        CountDownLatch countDownLatch = new CountDownLatch(org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount() * org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().size());
        sinks().foreach(new PollingFlumeTestUtils$$anonfun$sendDataAndEnsureAllDataHasBeenReceived$1(this, countDownLatch));
        org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().foreach(new PollingFlumeTestUtils$$anonfun$sendDataAndEnsureAllDataHasBeenReceived$2(this, executorCompletionService));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().size()).foreach(new PollingFlumeTestUtils$$anonfun$sendDataAndEnsureAllDataHasBeenReceived$3(this, executorCompletionService));
        countDownLatch.await(15L, TimeUnit.SECONDS);
    }

    public void assertOutput(List<Map<String, String>> list, List<String> list2) {
        Predef$.MODULE$.require(list.size() == list2.size());
        int size = list.size();
        if (size != org$apache$spark$streaming$flume$PollingFlumeTestUtils$$totalEventsPerChannel() * org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().size()) {
            throw new AssertionError(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Expected ", " events, but was ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(org$apache$spark$streaming$flume$PollingFlumeTestUtils$$totalEventsPerChannel() * org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().size()), BoxesRunTime.boxToInteger(size)})));
        }
        IntRef intRef = new IntRef(0);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().size()).foreach$mVc$sp(new PollingFlumeTestUtils$$anonfun$assertOutput$1(this, list, list2, size, intRef));
        if (intRef.elem != org$apache$spark$streaming$flume$PollingFlumeTestUtils$$totalEventsPerChannel() * org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().size()) {
            throw new AssertionError(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"111 Expected ", " events, but was ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(org$apache$spark$streaming$flume$PollingFlumeTestUtils$$totalEventsPerChannel() * org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().size()), BoxesRunTime.boxToInteger(intRef.elem)})));
        }
    }

    public void assertChannelsAreEmpty() {
        org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().foreach(new PollingFlumeTestUtils$$anonfun$assertChannelsAreEmpty$1(this));
    }

    public void org$apache$spark$streaming$flume$PollingFlumeTestUtils$$assertChannelIsEmpty(MemoryChannel memoryChannel) {
        Field declaredField = memoryChannel.getClass().getDeclaredField("queueRemaining");
        declaredField.setAccessible(true);
        if (BoxesRunTime.unboxToInt(declaredField.get(memoryChannel).getClass().getDeclaredMethod("availablePermits", new Class[0]).invoke(declaredField.get(memoryChannel), new Object[0])) != channelCapacity()) {
            throw new AssertionError(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Channel ", " is not empty"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{memoryChannel.getName()})));
        }
    }

    public void close() {
        sinks().foreach(new PollingFlumeTestUtils$$anonfun$close$1(this));
        sinks().clear();
        org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().foreach(new PollingFlumeTestUtils$$anonfun$close$2(this));
        org$apache$spark$streaming$flume$PollingFlumeTestUtils$$channels().clear();
    }
}
