/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.streaming.continuous;

import java.io.Serializable;
import org.apache.spark.SparkEnv$;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.rpc.RpcEndpointRef;
import org.apache.spark.sql.LocalSparkSession;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.streaming.continuous.CommitPartitionEpoch;
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution;
import org.apache.spark.sql.execution.streaming.continuous.EpochCoordinatorRef$;
import org.apache.spark.sql.execution.streaming.continuous.GetCurrentEpoch$;
import org.apache.spark.sql.execution.streaming.continuous.ReportPartitionOffset;
import org.apache.spark.sql.execution.streaming.continuous.SetReaderPartitions;
import org.apache.spark.sql.execution.streaming.continuous.SetWriterPartitions;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.test.TestSparkSession;
import org.mockito.InOrder;
import org.mockito.Matchers;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.scalactic.source.Position;
import org.scalatest.Args;
import org.scalatest.BeforeAndAfterEach;
import org.scalatest.FunSuiteLike;
import org.scalatest.Status;
import org.scalatest.Tag;
import org.scalatest.mockito.MockitoSugar;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.List$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005Uc\u0001B\u0001\u0003\u0001=\u0011Q#\u00129pG\"\u001cun\u001c:eS:\fGo\u001c:Tk&$XM\u0003\u0002\u0004\t\u0005Q1m\u001c8uS:,x.^:\u000b\u0005\u00151\u0011!C:ue\u0016\fW.\u001b8h\u0015\t9\u0001\"A\u0002tc2T!!\u0003\u0006\u0002\u000bM\u0004\u0018M]6\u000b\u0005-a\u0011AB1qC\u000eDWMC\u0001\u000e\u0003\ry'oZ\u0002\u0001'\u0015\u0001\u0001\u0003\u0006\r!!\t\t\"#D\u0001\t\u0013\t\u0019\u0002BA\u0007Ta\u0006\u00148NR;o'VLG/\u001a\t\u0003+Yi\u0011AB\u0005\u0003/\u0019\u0011\u0011\u0003T8dC2\u001c\u0006/\u0019:l'\u0016\u001c8/[8o!\tIb$D\u0001\u001b\u0015\tYB$A\u0004n_\u000e\\\u0017\u000e^8\u000b\u0005ua\u0011!C:dC2\fG/Z:u\u0013\ty\"D\u0001\u0007N_\u000e\\\u0017\u000e^8Tk\u001e\f'\u000f\u0005\u0002\"E5\tA$\u0003\u0002$9\t\u0011\")\u001a4pe\u0016\fe\u000eZ!gi\u0016\u0014X)Y2i\u0011\u0015)\u0003\u0001\"\u0001'\u0003\u0019a\u0014N\\5u}Q\tq\u0005\u0005\u0002)\u00015\t!\u0001C\u0005+\u0001\u0001\u0007\t\u0019!C\u0005W\u0005\u0001R\r]8dQ\u000e{wN\u001d3j]\u0006$xN]\u000b\u0002YA\u0011Q\u0006M\u0007\u0002])\u0011q\u0006C\u0001\u0004eB\u001c\u0017BA\u0019/\u00059\u0011\u0006oY#oIB|\u0017N\u001c;SK\u001aD\u0011b\r\u0001A\u0002\u0003\u0007I\u0011\u0002\u001b\u0002)\u0015\u0004xn\u00195D_>\u0014H-\u001b8bi>\u0014x\fJ3r)\t)4\b\u0005\u00027s5\tqGC\u00019\u0003\u0015\u00198-\u00197b\u0013\tQtG\u0001\u0003V]&$\bb\u0002\u001f3\u0003\u0003\u0005\r\u0001L\u0001\u0004q\u0012\n\u0004\"\u0003 \u0001\u0001\u0004\u0005\t\u0015)\u0003-\u0003E)\u0007o\\2i\u0007>|'\u000fZ5oCR|'\u000f\t\u0005\n\u0001\u0002\u0001\r\u00111A\u0005\n\u0005\u000baa\u001e:ji\u0016\u0014X#\u0001\"\u0011\u0005\rSU\"\u0001#\u000b\u0005\u0015)%B\u0001!G\u0015\t9\u0005*\u0001\u0002we)\u0011\u0011JB\u0001\bg>,(oY3t\u0013\tYEI\u0001\u0007TiJ,\u0017-\\,sSR,'\u000fC\u0005N\u0001\u0001\u0007\t\u0019!C\u0005\u001d\u0006QqO]5uKJ|F%Z9\u0015\u0005Uz\u0005b\u0002\u001fM\u0003\u0003\u0005\rA\u0011\u0005\n#\u0002\u0001\r\u0011!Q!\n\t\u000bqa\u001e:ji\u0016\u0014\b\u0005C\u0005T\u0001\u0001\u0007\t\u0019!C\u0005)\u0006)\u0011/^3ssV\tQ\u000b\u0005\u0002W76\tqK\u0003\u0002\u00041*\u0011Q!\u0017\u0006\u00035\u001a\t\u0011\"\u001a=fGV$\u0018n\u001c8\n\u0005q;&aE\"p]RLg.^8vg\u0016CXmY;uS>t\u0007\"\u00030\u0001\u0001\u0004\u0005\r\u0011\"\u0003`\u0003%\tX/\u001a:z?\u0012*\u0017\u000f\u0006\u00026A\"9A(XA\u0001\u0002\u0004)\u0006\"\u00032\u0001\u0001\u0004\u0005\t\u0015)\u0003V\u0003\u0019\tX/\u001a:zA!IA\r\u0001a\u0001\u0002\u0004%I!Z\u0001\u000e_J$WM\u001d,fe&4\u0017.\u001a:\u0016\u0003\u0019\u0004\"aZ5\u000e\u0003!T!a\u0007\u0007\n\u0005)D'aB%o\u001fJ$WM\u001d\u0005\nY\u0002\u0001\r\u00111A\u0005\n5\f\u0011c\u001c:eKJ4VM]5gS\u0016\u0014x\fJ3r)\t)d\u000eC\u0004=W\u0006\u0005\t\u0019\u00014\t\u0013A\u0004\u0001\u0019!A!B\u00131\u0017AD8sI\u0016\u0014h+\u001a:jM&,'\u000f\t\u0005\u0006e\u0002!\te]\u0001\u000bE\u00164wN]3FC\u000eDG#A\u001b\t\u000bU\u0004A\u0011\u0002<\u0002'M,Go\u0016:ji\u0016\u0014\b+\u0019:uSRLwN\\:\u0015\u0005U:\b\"\u0002=u\u0001\u0004I\u0018!\u00048v[B\u000b'\u000f^5uS>t7\u000f\u0005\u00027u&\u00111p\u000e\u0002\u0004\u0013:$\b\"B?\u0001\t\u0013q\u0018aE:fiJ+\u0017\rZ3s!\u0006\u0014H/\u001b;j_:\u001cHCA\u001b\u0000\u0011\u0015AH\u00101\u0001z\u0011\u001d\t\u0019\u0001\u0001C\u0005\u0003\u000b\tAcY8n[&$\b+\u0019:uSRLwN\\#q_\u000eDG#B\u001b\u0002\b\u0005-\u0001bBA\u0005\u0003\u0003\u0001\r!_\u0001\fa\u0006\u0014H/\u001b;j_:LE\r\u0003\u0005\u0002\u000e\u0005\u0005\u0001\u0019AA\b\u0003\u0015)\u0007o\\2i!\r1\u0014\u0011C\u0005\u0004\u0003'9$\u0001\u0002'p]\u001eDq!a\u0006\u0001\t\u0013\tI\"A\u000bsKB|'\u000f\u001e)beRLG/[8o\u001f\u001a47/\u001a;\u0015\u000bU\nY\"!\b\t\u000f\u0005%\u0011Q\u0003a\u0001s\"A\u0011QBA\u000b\u0001\u0004\ty\u0001\u0003\u0004\u0002\"\u0001!Ia]\u0001\u0014[\u0006\\WmU=oG\"\u0014xN\\8vg\u000e\u000bG\u000e\u001c\u0005\b\u0003K\u0001A\u0011BA\u0014\u000311XM]5gs\u000e{W.\\5u)\r)\u0014\u0011\u0006\u0005\t\u0003\u001b\t\u0019\u00031\u0001\u0002\u0010!9\u0011Q\u0006\u0001\u0005\n\u0005=\u0012!\u0005<fe&4\u0017PT8D_6l\u0017\u000e\u001e$peR\u0019Q'!\r\t\u0011\u00055\u00111\u0006a\u0001\u0003\u001fAq!!\u000e\u0001\t\u0013\t9$\u0001\fwKJLg-_\"p[6LGo]%o\u001fJ$WM](g)\r)\u0014\u0011\b\u0005\t\u0003w\t\u0019\u00041\u0001\u0002>\u00051Q\r]8dQN\u0004b!a\u0010\u0002P\u0005=a\u0002BA!\u0003\u0017rA!a\u0011\u0002J5\u0011\u0011Q\t\u0006\u0004\u0003\u000fr\u0011A\u0002\u001fs_>$h(C\u00019\u0013\r\tieN\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\t\t&a\u0015\u0003\u0007M+\u0017OC\u0002\u0002N]\u0002")
public class EpochCoordinatorSuite
extends SparkFunSuite
implements LocalSparkSession,
MockitoSugar {
    private RpcEndpointRef epochCoordinator;
    private StreamWriter writer;
    private ContinuousExecution query;
    private InOrder orderVerifier;
    private transient SparkSession spark;

    public <T> T mock(ClassTag<T> classTag) {
        return (T)MockitoSugar.mock$((MockitoSugar)this, classTag);
    }

    public <T> T mock(Answer<?> defaultAnswer, ClassTag<T> classTag) {
        return (T)MockitoSugar.mock$((MockitoSugar)this, defaultAnswer, classTag);
    }

    public <T> T mock(MockSettings mockSettings, ClassTag<T> classTag) {
        return (T)MockitoSugar.mock$((MockitoSugar)this, (MockSettings)mockSettings, classTag);
    }

    public <T> T mock(String name, ClassTag<T> classTag) {
        return (T)MockitoSugar.mock$((MockitoSugar)this, (String)name, classTag);
    }

    @Override
    public /* synthetic */ void org$apache$spark$sql$LocalSparkSession$$super$beforeAll() {
        super.beforeAll();
    }

    @Override
    public /* synthetic */ void org$apache$spark$sql$LocalSparkSession$$super$afterEach() {
        BeforeAndAfterEach.afterEach$((BeforeAndAfterEach)this);
    }

    @Override
    public void beforeAll() {
        LocalSparkSession.beforeAll$(this);
    }

    @Override
    public void afterEach() {
        LocalSparkSession.afterEach$(this);
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfterEach$$super$runTest(String testName, Args args) {
        return FunSuiteLike.runTest$((FunSuiteLike)this, (String)testName, (Args)args);
    }

    public Status runTest(String testName, Args args) {
        return BeforeAndAfterEach.runTest$((BeforeAndAfterEach)this, (String)testName, (Args)args);
    }

    @Override
    public SparkSession spark() {
        return this.spark;
    }

    @Override
    public void spark_$eq(SparkSession x$1) {
        this.spark = x$1;
    }

    private RpcEndpointRef epochCoordinator() {
        return this.epochCoordinator;
    }

    private void epochCoordinator_$eq(RpcEndpointRef x$1) {
        this.epochCoordinator = x$1;
    }

    private StreamWriter writer() {
        return this.writer;
    }

    private void writer_$eq(StreamWriter x$1) {
        this.writer = x$1;
    }

    private ContinuousExecution query() {
        return this.query;
    }

    private void query_$eq(ContinuousExecution x$1) {
        this.query = x$1;
    }

    private InOrder orderVerifier() {
        return this.orderVerifier;
    }

    private void orderVerifier_$eq(InOrder x$1) {
        this.orderVerifier = x$1;
    }

    public void beforeEach() {
        ContinuousReader reader = (ContinuousReader)this.mock(ClassTag$.MODULE$.apply(ContinuousReader.class));
        this.writer_$eq((StreamWriter)this.mock(ClassTag$.MODULE$.apply(StreamWriter.class)));
        this.query_$eq((ContinuousExecution)this.mock(ClassTag$.MODULE$.apply(ContinuousExecution.class)));
        this.orderVerifier_$eq(Mockito.inOrder((Object[])new Object[]{this.writer(), this.query()}));
        this.spark_$eq(new TestSparkSession());
        this.epochCoordinator_$eq(EpochCoordinatorRef$.MODULE$.create(this.writer(), reader, this.query(), "test", 1L, this.spark(), SparkEnv$.MODULE$.get()));
    }

    private void setWriterPartitions(int numPartitions) {
        this.epochCoordinator().askSync((Object)new SetWriterPartitions(numPartitions), ClassTag$.MODULE$.Unit());
    }

    private void setReaderPartitions(int numPartitions) {
        this.epochCoordinator().askSync((Object)new SetReaderPartitions(numPartitions), ClassTag$.MODULE$.Unit());
    }

    private void commitPartitionEpoch(int partitionId, long epoch) {
        WriterCommitMessage dummyMessage = (WriterCommitMessage)this.mock(ClassTag$.MODULE$.apply(WriterCommitMessage.class));
        this.epochCoordinator().send((Object)new CommitPartitionEpoch(partitionId, epoch, dummyMessage));
    }

    private void reportPartitionOffset(int partitionId, long epoch) {
        PartitionOffset dummyOffset = (PartitionOffset)this.mock(ClassTag$.MODULE$.apply(PartitionOffset.class));
        this.epochCoordinator().send((Object)new ReportPartitionOffset(partitionId, epoch, dummyOffset));
    }

    private void makeSynchronousCall() {
        this.epochCoordinator().askSync((Object)GetCurrentEpoch$.MODULE$, ClassTag$.MODULE$.Long());
    }

    private void verifyCommit(long epoch) {
        ((StreamWriter)this.orderVerifier().verify((Object)this.writer())).commit(Matchers.eq((long)epoch), (WriterCommitMessage[])Matchers.any());
        ((ContinuousExecution)this.orderVerifier().verify((Object)this.query())).commit(epoch);
    }

    private void verifyNoCommitFor(long epoch) {
        ((StreamWriter)Mockito.verify((Object)this.writer(), (VerificationMode)Mockito.never())).commit(Matchers.eq((long)epoch), (WriterCommitMessage[])Matchers.any());
        ((ContinuousExecution)Mockito.verify((Object)this.query(), (VerificationMode)Mockito.never())).commit(epoch);
    }

    private void verifyCommitsInOrderOf(Seq<Object> epochs) {
        epochs.foreach((Function1)(JFunction1.mcVJ.sp & Serializable & scala.Serializable)epoch -> this.verifyCommit(epoch));
    }

    public EpochCoordinatorSuite() {
        BeforeAndAfterEach.$init$((BeforeAndAfterEach)this);
        LocalSparkSession.$init$(this);
        MockitoSugar.$init$((MockitoSugar)this);
        this.test("single epoch", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.setWriterPartitions(3);
            this.setReaderPartitions(2);
            this.commitPartitionEpoch(0, 1L);
            this.commitPartitionEpoch(1, 1L);
            this.commitPartitionEpoch(2, 1L);
            this.reportPartitionOffset(0, 1L);
            this.reportPartitionOffset(1, 1L);
            this.makeSynchronousCall();
            this.verifyCommit(1L);
        }, new Position("EpochCoordinatorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 59));
        this.test("single epoch, all but one writer partition has committed", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.setWriterPartitions(3);
            this.setReaderPartitions(2);
            this.commitPartitionEpoch(0, 1L);
            this.commitPartitionEpoch(1, 1L);
            this.reportPartitionOffset(0, 1L);
            this.reportPartitionOffset(1, 1L);
            this.makeSynchronousCall();
            this.verifyNoCommitFor(1L);
        }, new Position("EpochCoordinatorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 76));
        this.test("single epoch, all but one reader partition has reported an offset", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.setWriterPartitions(3);
            this.setReaderPartitions(2);
            this.commitPartitionEpoch(0, 1L);
            this.commitPartitionEpoch(1, 1L);
            this.commitPartitionEpoch(2, 1L);
            this.reportPartitionOffset(0, 1L);
            this.makeSynchronousCall();
            this.verifyNoCommitFor(1L);
        }, new Position("EpochCoordinatorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 90));
        this.test("consequent epochs, messages for epoch (k + 1) arrive after messages for epoch k", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.setWriterPartitions(2);
            this.setReaderPartitions(2);
            this.commitPartitionEpoch(0, 1L);
            this.commitPartitionEpoch(1, 1L);
            this.reportPartitionOffset(0, 1L);
            this.reportPartitionOffset(1, 1L);
            this.commitPartitionEpoch(0, 2L);
            this.commitPartitionEpoch(1, 2L);
            this.reportPartitionOffset(0, 2L);
            this.reportPartitionOffset(1, 2L);
            this.makeSynchronousCall();
            this.verifyCommitsInOrderOf((Seq<Object>)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapLongArray(new long[]{1L, 2L})));
        }, new Position("EpochCoordinatorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 104));
        this.test("consequent epochs, a message for epoch k arrives after messages for epoch (k + 1)", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.setWriterPartitions(2);
            this.setReaderPartitions(2);
            this.commitPartitionEpoch(0, 1L);
            this.commitPartitionEpoch(1, 1L);
            this.reportPartitionOffset(0, 1L);
            this.commitPartitionEpoch(0, 2L);
            this.commitPartitionEpoch(1, 2L);
            this.reportPartitionOffset(0, 2L);
            this.reportPartitionOffset(1, 2L);
            this.reportPartitionOffset(1, 1L);
            this.makeSynchronousCall();
            this.verifyCommitsInOrderOf((Seq<Object>)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapLongArray(new long[]{1L, 2L})));
        }, new Position("EpochCoordinatorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 123));
        this.test("several epochs, messages arrive in order 1 -> 3 -> 4 -> 2", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.setWriterPartitions(1);
            this.setReaderPartitions(1);
            this.commitPartitionEpoch(0, 1L);
            this.reportPartitionOffset(0, 1L);
            this.commitPartitionEpoch(0, 3L);
            this.reportPartitionOffset(0, 3L);
            this.commitPartitionEpoch(0, 4L);
            this.reportPartitionOffset(0, 4L);
            this.commitPartitionEpoch(0, 2L);
            this.reportPartitionOffset(0, 2L);
            this.makeSynchronousCall();
            this.verifyCommitsInOrderOf((Seq<Object>)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapLongArray(new long[]{1L, 2L, 3L, 4L})));
        }, new Position("EpochCoordinatorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 144));
        this.test("several epochs, messages arrive in order 1 -> 3 -> 5 -> 4 -> 2", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.setWriterPartitions(1);
            this.setReaderPartitions(1);
            this.commitPartitionEpoch(0, 1L);
            this.reportPartitionOffset(0, 1L);
            this.commitPartitionEpoch(0, 3L);
            this.reportPartitionOffset(0, 3L);
            this.commitPartitionEpoch(0, 5L);
            this.reportPartitionOffset(0, 5L);
            this.commitPartitionEpoch(0, 4L);
            this.reportPartitionOffset(0, 4L);
            this.commitPartitionEpoch(0, 2L);
            this.reportPartitionOffset(0, 2L);
            this.makeSynchronousCall();
            this.verifyCommitsInOrderOf((Seq<Object>)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapLongArray(new long[]{1L, 2L, 3L, 4L, 5L})));
        }, new Position("EpochCoordinatorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 165));
    }
}

