package cascading.tap.local;

import cascading.PlatformTestCase;
import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.flow.local.LocalFlowConnector;
import cascading.pipe.Pipe;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.scheme.local.Compressors;
import cascading.scheme.local.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import data.InputData;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.LineNumberReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.Properties;
import org.junit.Test;

/* loaded from: input_file:cascading/tap/local/LocalTapPlatformTest.class */
public class LocalTapPlatformTest extends PlatformTestCase implements Serializable {

    /* loaded from: input_file:cascading/tap/local/LocalTapPlatformTest$SchemeWithProperties.class */
    private static class SchemeWithProperties extends TextLine {
        public SchemeWithProperties(Fields fields) {
            super(fields);
        }

        public void sourceConfInit(FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties properties) {
            if (!"connector-default".equals(flowProcess.getProperty("default"))) {
                throw new RuntimeException("not default value");
            }
            properties.setProperty("replace", "source-replace");
            properties.setProperty("local", "source-local");
            super.sourceConfInit(flowProcess, tap, properties);
        }

        public void sinkConfInit(FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties properties) {
            if (!"connector-default".equals(flowProcess.getProperty("default"))) {
                throw new RuntimeException("not default value");
            }
            properties.setProperty("replace", "sink-replace");
            properties.setProperty("local", "sink-local");
            super.sinkConfInit(flowProcess, tap, properties);
        }

        public void sourcePrepare(FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall) throws IOException {
            if (!"connector-default".equals(flowProcess.getProperty("default"))) {
                throw new RuntimeException("not default value");
            }
            if (!"source-replace".equals(flowProcess.getProperty("replace"))) {
                throw new RuntimeException("not replaced value");
            }
            if (!"source-local".equals(flowProcess.getProperty("local"))) {
                throw new RuntimeException("not local value");
            }
            super.sourcePrepare(flowProcess, sourceCall);
        }

        public void sinkPrepare(FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall) throws IOException {
            if (!"connector-default".equals(flowProcess.getProperty("default"))) {
                throw new RuntimeException("not default value");
            }
            if (!"sink-replace".equals(flowProcess.getProperty("replace"))) {
                throw new RuntimeException("not replaced value");
            }
            if (!"sink-local".equals(flowProcess.getProperty("local"))) {
                throw new RuntimeException("not local value");
            }
            super.sinkPrepare(flowProcess, sinkCall);
        }

        public /* bridge */ /* synthetic */ void sinkConfInit(FlowProcess flowProcess, Tap tap, Object obj) {
            sinkConfInit((FlowProcess<? extends Properties>) flowProcess, (Tap<Properties, InputStream, OutputStream>) tap, (Properties) obj);
        }

        public /* bridge */ /* synthetic */ void sourceConfInit(FlowProcess flowProcess, Tap tap, Object obj) {
            sourceConfInit((FlowProcess<? extends Properties>) flowProcess, (Tap<Properties, InputStream, OutputStream>) tap, (Properties) obj);
        }
    }

    @Test
    public void testIO() {
        System.setIn(new ByteArrayInputStream("line1\nline2\n".getBytes()));
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        System.setOut(new PrintStream(byteArrayOutputStream));
        new LocalFlowConnector().connect(new StdInTap(new TextLine(new Fields(new Comparable[]{"line"}))), new StdOutTap(new TextLine(new Fields(new Comparable[]{"line"}))), new Pipe("io")).complete();
        assertEquals("line1\nline2\n", byteArrayOutputStream.toString());
    }

    @Test
    public void testSourceConfInit() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileNums20);
        Tap tap = getPlatform().getTap(new SchemeWithProperties(new Fields(new Comparable[]{"line"})), InputData.inputFileNums20, SinkMode.KEEP);
        Pipe pipe = new Pipe("test");
        Tap textFile = getPlatform().getTextFile(getOutputPath("sourceconfinit"), SinkMode.REPLACE);
        Properties properties = new Properties();
        properties.setProperty("default", "connector-default");
        properties.setProperty("replace", "connector-replace");
        Flow connect = getPlatform().getFlowConnector(properties).connect(tap, textFile, pipe);
        connect.complete();
        assertTrue(connect.resourceExists(textFile));
    }

    @Test
    public void testSinkConfInit() throws IOException {
        getPlatform().copyFromLocal(InputData.inputFileNums20);
        Tap textFile = getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), InputData.inputFileNums20, SinkMode.KEEP);
        Pipe pipe = new Pipe("test");
        Tap tap = getPlatform().getTap(new SchemeWithProperties(new Fields(new Comparable[]{"line"})), getOutputPath("sinkconfinit"), SinkMode.REPLACE);
        Properties properties = new Properties();
        properties.setProperty("default", "connector-default");
        properties.setProperty("replace", "connector-replace");
        Flow connect = getPlatform().getFlowConnector(properties).connect(textFile, tap, pipe);
        connect.complete();
        assertTrue(connect.resourceExists(tap));
    }

    @Test
    public void testDirTap() throws Exception {
        Flow connect = getPlatform().getFlowConnector().connect(new DirTap(new TextLine(), InputData.inputPath, "glob:**/*.txt"), new DirTap(new TextLine(), getOutputPath(), SinkMode.REPLACE), new Pipe("copy"));
        connect.complete();
        assertEquals(674, getSinkAsList(connect).size());
    }

    @Test
    public void testSchemeCompression() throws Exception {
        DirTap dirTap = new DirTap(new TextLine(), InputData.inputPath, "glob:**/*.txt");
        DirTap dirTap2 = new DirTap(new TextLine(Compressors.GZIP), getOutputPath("compressed"), SinkMode.REPLACE);
        DirTap dirTap3 = new DirTap(new TextLine(), getOutputPath("uncompressed"), SinkMode.REPLACE);
        getPlatform().getFlowConnector().connect("first", dirTap, dirTap2, new Pipe("copy")).complete();
        Flow connect = getPlatform().getFlowConnector().connect("second", dirTap2, dirTap3, new Pipe("copy"));
        connect.complete();
        assertEquals(674, getSinkAsList(connect).size());
    }
}
