/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.mapreduce;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Loader;
import org.apache.fluo.api.client.LoaderExecutor;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class FluoOutputFormat
extends OutputFormat<Loader, NullWritable> {
    private static String PROPS_CONF_KEY = FluoOutputFormat.class.getName() + ".props";

    public void checkOutputSpecs(JobContext arg0) throws IOException, InterruptedException {
    }

    public OutputCommitter getOutputCommitter(TaskAttemptContext arg0) throws IOException, InterruptedException {
        return new OutputCommitter(){

            public void setupTask(TaskAttemptContext context) throws IOException {
            }

            public void setupJob(JobContext context) throws IOException {
            }

            public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
                return false;
            }

            public void commitTask(TaskAttemptContext context) throws IOException {
            }

            public void abortTask(TaskAttemptContext context) throws IOException {
            }
        };
    }

    public RecordWriter<Loader, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        ByteArrayInputStream bais = new ByteArrayInputStream(context.getConfiguration().get(PROPS_CONF_KEY).getBytes(StandardCharsets.UTF_8));
        FluoConfiguration config = new FluoConfiguration((InputStream)bais);
        try {
            final FluoClient client = FluoFactory.newClient((SimpleConfiguration)config);
            final LoaderExecutor lexecutor = client.newLoaderExecutor();
            return new RecordWriter<Loader, NullWritable>(){

                public void close(TaskAttemptContext context) throws IOException, InterruptedException {
                    try {
                        lexecutor.close();
                    }
                    finally {
                        client.close();
                    }
                }

                public void write(Loader loader, NullWritable nullw) throws IOException, InterruptedException {
                    lexecutor.execute(loader);
                }
            };
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    public static void configure(Job conf, SimpleConfiguration props) {
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            props.save((OutputStream)baos);
            conf.getConfiguration().set(PROPS_CONF_KEY, new String(baos.toByteArray(), StandardCharsets.UTF_8));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

