/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.example.client;

import java.io.File;
import java.io.FileWriter;
import java.io.Serializable;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.client.deployment.executors.LocalExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.legacy.io.TextInputFormat;
import org.apache.flink.streaming.api.legacy.io.TextOutputFormat;
import org.apache.flink.test.testfunctions.Tokenizer;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class LocalExecutorITCase
extends TestLogger {
    private static final int parallelism = 4;
    private MiniCluster miniCluster;
    private LocalExecutor executor;

    @Before
    public void before() {
        this.executor = LocalExecutor.createWithFactory((Configuration)new Configuration(), config -> {
            this.miniCluster = new MiniCluster(config);
            return this.miniCluster;
        });
    }

    @Test(timeout=60000L)
    public void testLocalExecutorWithWordCount() throws InterruptedException {
        try {
            File inFile = File.createTempFile("wctext", ".in");
            File outFile = File.createTempFile("wctext", ".out");
            inFile.deleteOnExit();
            outFile.deleteOnExit();
            try (FileWriter fw = new FileWriter(inFile);){
                fw.write("Goethe - Faust: Der Tragoedie erster Teil\nProlog im Himmel.\nDer Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\nErzengel treten vor.\nRAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\nUnd ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\ngibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\nhohen Werke Sind herrlich wie am ersten Tag.\nGABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\nPracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\nschaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\nFels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\nMICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\naufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\nDa flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\ndeine Boten, Herr, verehren Das sanfte Wandeln deines Tags.\nZU DREI: Der Anblick gibt den Engeln Staerke, Da keiner dich ergruenden\nmag, Und alle deine hohen Werke Sind herrlich wie am ersten Tag.\nMEPHISTOPHELES: Da du, o Herr, dich einmal wieder nahst Und fragst, wie\nalles sich bei uns befinde, Und du mich sonst gewoehnlich gerne sahst, So\nsiehst du mich auch unter dem Gesinde. Verzeih, ich kann nicht hohe Worte\nmachen, Und wenn mich auch der ganze Kreis verhoehnt; Mein Pathos braechte\ndich gewiss zum Lachen, Haettst du dir nicht das Lachen abgewoehnt. Von\nSonn' und Welten weiss ich nichts zu sagen, Ich sehe nur, wie sich die\nMenschen plagen. Der kleine Gott der Welt bleibt stets von gleichem\nSchlag, Und ist so wunderlich als wie am ersten Tag. Ein wenig besser\nwuerd er leben, Haettst du ihm nicht den Schein des Himmelslichts gegeben;\nEr nennt's Vernunft und braucht's allein, Nur tierischer als jedes Tier\nzu sein. Er scheint mir, mit Verlaub von euer Gnaden, Wie eine der\nlangbeinigen Zikaden, Die immer fliegt und fliegend springt Und gleich im\nGras ihr altes Liedchen singt; Und laeg er nur noch immer in dem Grase! In\njeden Quark begraebt er seine Nase.\nDER HERR: Hast du mir weiter nichts zu sagen? Kommst du nur immer\nanzuklagen? Ist auf der Erde ewig dir nichts recht?\nMEPHISTOPHELES: Nein Herr! ich find es dort, wie immer, herzlich\nschlecht. Die Menschen dauern mich in ihren Jammertagen, Ich mag sogar\ndie armen selbst nicht plagen.\nDER HERR: Kennst du den Faust?\nMEPHISTOPHELES: Den Doktor?\nDER HERR: Meinen Knecht!\nMEPHISTOPHELES: Fuerwahr! er dient Euch auf besondre Weise. Nicht irdisch\nist des Toren Trank noch Speise. Ihn treibt die Gaerung in die Ferne, Er\nist sich seiner Tollheit halb bewusst; Vom Himmel fordert er die schoensten\nSterne Und von der Erde jede hoechste Lust, Und alle Naeh und alle Ferne\nBefriedigt nicht die tiefbewegte Brust.\nDER HERR: Wenn er mir auch nur verworren dient, So werd ich ihn bald in\ndie Klarheit fuehren. Weiss doch der Gaertner, wenn das Baeumchen gruent, Das\nBluet und Frucht die kuenft'gen Jahre zieren.\nMEPHISTOPHELES: Was wettet Ihr? den sollt Ihr noch verlieren! Wenn Ihr\nmir die Erlaubnis gebt, Ihn meine Strasse sacht zu fuehren.\nDER HERR: Solang er auf der Erde lebt, So lange sei dir's nicht verboten,\nEs irrt der Mensch so lang er strebt.\nMEPHISTOPHELES: Da dank ich Euch; denn mit den Toten Hab ich mich niemals\ngern befangen. Am meisten lieb ich mir die vollen, frischen Wangen. Fuer\neinem Leichnam bin ich nicht zu Haus; Mir geht es wie der Katze mit der Maus.\nDER HERR: Nun gut, es sei dir ueberlassen! Zieh diesen Geist von seinem\nUrquell ab, Und fuehr ihn, kannst du ihn erfassen, Auf deinem Wege mit\nherab, Und steh beschaemt, wenn du bekennen musst: Ein guter Mensch, in\nseinem dunklen Drange, Ist sich des rechten Weges wohl bewusst.\nMEPHISTOPHELES: Schon gut! nur dauert es nicht lange. Mir ist fuer meine\nWette gar nicht bange. Wenn ich zu meinem Zweck gelange, Erlaubt Ihr mir\nTriumph aus voller Brust. Staub soll er fressen, und mit Lust, Wie meine\nMuhme, die beruehmte Schlange.\nDER HERR: Du darfst auch da nur frei erscheinen; Ich habe deinesgleichen\nnie gehasst. Von allen Geistern, die verneinen, ist mir der Schalk am\nwenigsten zur Last. Des Menschen Taetigkeit kann allzu leicht erschlaffen,\ner liebt sich bald die unbedingte Ruh; Drum geb ich gern ihm den Gesellen\nzu, Der reizt und wirkt und muss als Teufel schaffen. Doch ihr, die echten\nGoettersoehne, Erfreut euch der lebendig reichen Schoene! Das Werdende, das\newig wirkt und lebt, Umfass euch mit der Liebe holden Schranken, Und was\nin schwankender Erscheinung schwebt, Befestigt mit dauernden Gedanken!\n(Der Himmel schliesst, die Erzengel verteilen sich.)\nMEPHISTOPHELES (allein): Von Zeit zu Zeit seh ich den Alten gern, Und\nhuete mich, mit ihm zu brechen. Es ist gar huebsch von einem grossen Herrn,\nSo menschlich mit dem Teufel selbst zu sprechen.");
            }
            Configuration config = new Configuration();
            config.set(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, (Object)true);
            config.set(DeploymentOptions.ATTACHED, (Object)true);
            StreamGraph wcStreamGraph = this.getWordCountStreamGraph(inFile, outFile, 4);
            JobClient jobClient = (JobClient)this.executor.execute((Pipeline)wcStreamGraph, config, ClassLoader.getSystemClassLoader()).get();
            jobClient.getJobExecutionResult().get();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        MatcherAssert.assertThat((Object)this.miniCluster.isRunning(), (Matcher)Matchers.is((Object)false));
    }

    @Test(timeout=60000L)
    public void testMiniClusterShutdownOnErrors() throws Exception {
        StreamGraph runtimeExceptionPlan = this.getRuntimeExceptionPlan();
        Configuration config = new Configuration();
        config.set(DeploymentOptions.ATTACHED, (Object)true);
        JobClient jobClient = (JobClient)this.executor.execute((Pipeline)runtimeExceptionPlan, config, ClassLoader.getSystemClassLoader()).get();
        CommonTestUtils.assertThrows((String)"Job execution failed.", Exception.class, () -> jobClient.getJobExecutionResult().get());
        MatcherAssert.assertThat((Object)this.miniCluster.isRunning(), (Matcher)Matchers.is((Object)false));
    }

    private StreamGraph getWordCountStreamGraph(File inFile, File outFile, int parallelism) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        env.createInput((InputFormat)new TextInputFormat(new Path(inFile.getAbsolutePath()))).flatMap((FlatMapFunction)new Tokenizer()).keyBy((KeySelector & Serializable)x -> (String)x.f0).sum(1).addSink((SinkFunction)new OutputFormatSinkFunction((OutputFormat)new TextOutputFormat(new Path(outFile.getAbsolutePath()))));
        return env.getStreamGraph();
    }

    private StreamGraph getRuntimeExceptionPlan() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromData((Object[])new Integer[]{1}).map((MapFunction & Serializable)element -> {
            if (element == 1) {
                throw new RuntimeException("oups");
            }
            return element;
        }).sinkTo((Sink)new DiscardingSink());
        return env.getStreamGraph();
    }
}

