package org.apache.beam.runners.flink;

import com.google.common.base.Joiner;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.Assert;

/* loaded from: input_file:org/apache/beam/runners/flink/WriteSinkITCase.class */
public class WriteSinkITCase extends JavaProgramTestBase {
    protected String resultPath;
    static final String[] EXPECTED_RESULT = {"Joe red 3", "Mary blue 4", "Max yellow 23"};

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/WriteSinkITCase$MyCustomSink.class */
    public static class MyCustomSink extends Sink<String> {
        private final String resultPath;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/runners/flink/WriteSinkITCase$MyCustomSink$MyWriteOperation.class */
        public class MyWriteOperation extends Sink.WriteOperation<String, String> {

            /* loaded from: input_file:org/apache/beam/runners/flink/WriteSinkITCase$MyCustomSink$MyWriteOperation$MyWriter.class */
            private class MyWriter extends Sink.Writer<String, String> {
                private PrintWriter internalWriter;

                private MyWriter() {
                }

                public void open(String str) throws Exception {
                    Path path = new Path(MyCustomSink.this.resultPath + "/" + str);
                    FileSystem.get(new URI("file:///")).create(path, false);
                    this.internalWriter = new PrintWriter(new File(path.toUri()));
                }

                public void write(String str) throws Exception {
                    this.internalWriter.println(str);
                }

                /* renamed from: close, reason: merged with bridge method [inline-methods] */
                public String m5close() throws Exception {
                    this.internalWriter.close();
                    return MyCustomSink.this.resultPath;
                }

                public Sink.WriteOperation<String, String> getWriteOperation() {
                    return MyWriteOperation.this;
                }
            }

            private MyWriteOperation() {
            }

            public Coder<String> getWriterResultCoder() {
                return StringUtf8Coder.of();
            }

            public void initialize(PipelineOptions pipelineOptions) throws Exception {
            }

            public void finalize(Iterable<String> iterable, PipelineOptions pipelineOptions) throws Exception {
            }

            public Sink.Writer<String, String> createWriter(PipelineOptions pipelineOptions) throws Exception {
                return new MyWriter();
            }

            public Sink<String> getSink() {
                return MyCustomSink.this;
            }
        }

        public MyCustomSink(String str) {
            this.resultPath = str;
        }

        public void validate(PipelineOptions pipelineOptions) {
            Assert.assertNotNull(pipelineOptions);
        }

        public Sink.WriteOperation<String, ?> createWriteOperation(PipelineOptions pipelineOptions) {
            return new MyWriteOperation();
        }
    }

    protected void preSubmit() throws Exception {
        this.resultPath = getTempDirPath("result-" + System.nanoTime());
    }

    protected void postSubmit() throws Exception {
        compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), this.resultPath);
    }

    protected void testProgram() throws Exception {
        runProgram(this.resultPath);
    }

    public void stopCluster() throws Exception {
        try {
            super.stopCluster();
        } catch (IOException e) {
            if (e.getMessage().startsWith("Unable to delete file")) {
            }
        }
    }

    private static void runProgram(String str) {
        FlinkTestPipeline createForBatch = FlinkTestPipeline.createForBatch();
        createForBatch.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of()).apply("CustomSink", Write.to(new MyCustomSink(str)));
        createForBatch.run();
    }
}
