/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow;

import cascading.PlatformTestCase;
import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.OperationCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryIterator;
import data.InputData;
import java.io.IOException;
import org.junit.Test;

public class FlowProcessPlatformTest
extends PlatformTestCase {
    public FlowProcessPlatformTest() {
        super(true);
    }

    @Test
    public void testOpenForRead() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileNums20);
        this.getPlatform().copyFromLocal(InputData.inputFileNums10);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), InputData.inputFileNums20);
        Pipe pipe = new Pipe("test");
        Tap tap = this.getPlatform().getTextFile(new Fields(new Comparable[]{"value"}), InputData.inputFileNums10);
        pipe = new Each(pipe, (Function)new IterateInsert(new Fields(new Comparable[]{"value"}), tap), Fields.ALL);
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("openforread"), SinkMode.REPLACE);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        FlowProcessPlatformTest.validateLength((Flow)flow, (int)200);
    }

    public static class IterateInsert
    extends BaseOperation
    implements Function {
        private Tap tap;

        public IterateInsert(Fields fieldDeclaration, Tap tap) {
            super(fieldDeclaration);
            this.tap = tap;
        }

        public void prepare(FlowProcess flowProcess, OperationCall operationCall) {
        }

        public void cleanup(FlowProcess flowProcess, OperationCall operationCall) {
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            try {
                TupleEntryIterator iterator = flowProcess.openTapForRead(this.tap);
                while (iterator.hasNext()) {
                    functionCall.getOutputCollector().add(new Tuple(((TupleEntry)iterator.next()).getTuple()));
                }
                iterator.close();
            }
            catch (IOException exception) {
                exception.printStackTrace();
            }
        }
    }
}

