/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory;
import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class BulkWriterTest
extends TestLogger {
    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

    @Test
    public void testCustomBulkWriter() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createTestSinkWithBulkEncoder(outDir, 1, 0, 10L, new TestUtils.TupleToStringBucketer(), new TestBulkWriterFactory(), (BucketFactory<Tuple2<String, Integer>, String>)new DefaultBucketFactoryImpl());){
            testHarness.setup();
            testHarness.open();
            testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)Tuple2.of((Object)"test1", (Object)1), 1L));
            TestUtils.checkLocalFs(outDir, 1, 0);
            testHarness.snapshot(1L, 1L);
            testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)Tuple2.of((Object)"test1", (Object)2), 2L));
            testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)Tuple2.of((Object)"test1", (Object)3), 3L));
            testHarness.snapshot(2L, 2L);
            TestUtils.checkLocalFs(outDir, 2, 0);
            Map<File, String> contents = TestUtils.getFileContentByPath(outDir);
            int fileCounter = 0;
            for (Map.Entry<File, String> fileContents : contents.entrySet()) {
                if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
                    ++fileCounter;
                    Assert.assertEquals((Object)"test1@1\n", (Object)fileContents.getValue());
                    continue;
                }
                if (!fileContents.getKey().getName().contains(".part-0-1.inprogress")) continue;
                ++fileCounter;
                Assert.assertEquals((Object)"test1@2\ntest1@3\n", (Object)fileContents.getValue());
            }
            Assert.assertEquals((long)2L, (long)fileCounter);
            testHarness.notifyOfCompletedCheckpoint(2L);
            TestUtils.checkLocalFs(outDir, 0, 2);
        }
    }

    private static class TestBulkWriterFactory
    implements BulkWriter.Factory<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        private TestBulkWriterFactory() {
        }

        public BulkWriter<Tuple2<String, Integer>> create(FSDataOutputStream out) {
            return new TestBulkWriter(out);
        }
    }

    private static class TestBulkWriter
    implements BulkWriter<Tuple2<String, Integer>> {
        private static final Charset CHARSET = StandardCharsets.UTF_8;
        private final FSDataOutputStream stream;

        TestBulkWriter(FSDataOutputStream stream) {
            this.stream = (FSDataOutputStream)Preconditions.checkNotNull((Object)stream);
        }

        public void addElement(Tuple2<String, Integer> element) throws IOException {
            this.stream.write(((String)element.f0 + '@' + element.f1 + '\n').getBytes(CHARSET));
        }

        public void flush() throws IOException {
            this.stream.flush();
        }

        public void finish() throws IOException {
            this.flush();
        }
    }
}

