package org.apache.oodt.cas.resource.examples;

import java.io.FileNotFoundException;
import java.io.PrintStream;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.oodt.cas.resource.examples.PalindromeUtils;
import org.apache.oodt.cas.resource.structs.JobInput;
import org.apache.oodt.cas.resource.structs.NameValueJobInput;
import org.apache.oodt.cas.resource.structs.StreamingInstance;
import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/* loaded from: input_file:WEB-INF/lib/cas-resource-0.8.1.jar:org/apache/oodt/cas/resource/examples/StreamingPalindromeExample.class */
public class StreamingPalindromeExample implements StreamingInstance {
    JavaSparkContext sc;
    JavaStreamingContext ssc;

    @Override // org.apache.oodt.cas.resource.structs.JobInstance
    public boolean execute(JobInput jobInput) throws JobInputException {
        NameValueJobInput nameValueJobInput = (NameValueJobInput) jobInput;
        final int parseInt = Integer.parseInt(nameValueJobInput.getValue("time"));
        try {
            final PrintStream printStream = PalindromeUtils.getPrintStream(nameValueJobInput.getValue(MRJobConfig.OUTPUT));
            this.ssc.socketTextStream(nameValueJobInput.getValue("host"), Integer.parseInt(nameValueJobInput.getValue("port"))).filter(new PalindromeUtils.FilterPalindrome()).count().foreachRDD(new Function<JavaRDD<Long>, Void>() { // from class: org.apache.oodt.cas.resource.examples.StreamingPalindromeExample.1
                @Override // org.apache.spark.api.java.function.Function
                public Void call(JavaRDD<Long> javaRDD) throws Exception {
                    synchronized (printStream) {
                        for (Long l : (Long[]) javaRDD.rdd().collect()) {
                            printStream.println("Found " + l.longValue() + " palindromes.");
                        }
                    }
                    return null;
                }
            });
            this.ssc.start();
            new Thread(new Runnable() { // from class: org.apache.oodt.cas.resource.examples.StreamingPalindromeExample.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            Thread.sleep(parseInt);
                            StreamingPalindromeExample.this.ssc.stop();
                            printStream.println("Stopping after " + (parseInt / 1000) + " seconds.");
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            StreamingPalindromeExample.this.ssc.stop();
                            printStream.println("Stopping after " + (parseInt / 1000) + " seconds.");
                        }
                    } catch (Throwable th) {
                        StreamingPalindromeExample.this.ssc.stop();
                        printStream.println("Stopping after " + (parseInt / 1000) + " seconds.");
                        throw th;
                    }
                }
            }).start();
            this.ssc.awaitTermination();
            return true;
        } catch (FileNotFoundException e) {
            return false;
        }
    }

    @Override // org.apache.oodt.cas.resource.structs.StreamingInstance
    public void setStreamingContext(StreamingContext streamingContext) {
        this.ssc = new JavaStreamingContext(streamingContext);
    }

    @Override // org.apache.oodt.cas.resource.structs.SparkInstance
    public void setSparkContext(SparkContext sparkContext) {
        this.sc = new JavaSparkContext(sparkContext);
    }
}
