/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory;
import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Assert;

public class TestUtils {
    static final int MAX_PARALLELISM = 10;

    static OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createRescalingTestSink(File outDir, int totalParallelism, int taskIdx, long inactivityInterval, long partMaxSize) throws Exception {
        DefaultRollingPolicy rollingPolicy = DefaultRollingPolicy.create().withMaxPartSize(partMaxSize).withRolloverInterval(inactivityInterval).withInactivityInterval(inactivityInterval).build();
        TupleToStringBucketer bucketer = new TupleToStringBucketer();
        Encoder & Serializable encoder = (Encoder & Serializable)(element, stream) -> {
            stream.write(((String)element.f0 + '@' + element.f1).getBytes(StandardCharsets.UTF_8));
            stream.write(10);
        };
        return TestUtils.createCustomRescalingTestSink(outDir, totalParallelism, taskIdx, 10L, bucketer, (Encoder<Tuple2<String, Integer>>)encoder, (RollingPolicy<Tuple2<String, Integer>, String>)rollingPolicy, (BucketFactory<Tuple2<String, Integer>, String>)new DefaultBucketFactoryImpl());
    }

    static OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createCustomRescalingTestSink(File outDir, int totalParallelism, int taskIdx, long bucketCheckInterval, BucketAssigner<Tuple2<String, Integer>, String> bucketer, Encoder<Tuple2<String, Integer>> writer, RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy, BucketFactory<Tuple2<String, Integer>, String> bucketFactory) throws Exception {
        StreamingFileSink sink = StreamingFileSink.forRowFormat((Path)new Path(outDir.toURI()), writer).withBucketAssigner(bucketer).withRollingPolicy(rollingPolicy).withBucketCheckInterval(bucketCheckInterval).withBucketFactory(bucketFactory).build();
        return new OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object>((OneInputStreamOperator<Tuple2<String, Integer>, Object>)new StreamSink((SinkFunction)sink), 10, totalParallelism, taskIdx);
    }

    static OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createTestSinkWithBulkEncoder(File outDir, int totalParallelism, int taskIdx, long bucketCheckInterval, BucketAssigner<Tuple2<String, Integer>, String> bucketer, BulkWriter.Factory<Tuple2<String, Integer>> writer, BucketFactory<Tuple2<String, Integer>, String> bucketFactory) throws Exception {
        StreamingFileSink sink = StreamingFileSink.forBulkFormat((Path)new Path(outDir.toURI()), writer).withBucketAssigner(bucketer).withBucketCheckInterval(bucketCheckInterval).withBucketFactory(bucketFactory).build();
        return new OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object>((OneInputStreamOperator<Tuple2<String, Integer>, Object>)new StreamSink((SinkFunction)sink), 10, totalParallelism, taskIdx);
    }

    static void checkLocalFs(File outDir, int expectedInProgress, int expectedCompleted) {
        int inProgress = 0;
        int finished = 0;
        for (File file : FileUtils.listFiles((File)outDir, null, (boolean)true)) {
            if (file.getAbsolutePath().endsWith("crc")) continue;
            if (file.toPath().getFileName().toString().startsWith(".")) {
                ++inProgress;
                continue;
            }
            ++finished;
        }
        Assert.assertEquals((long)expectedInProgress, (long)inProgress);
        Assert.assertEquals((long)expectedCompleted, (long)finished);
    }

    static Map<File, String> getFileContentByPath(File directory) throws IOException {
        HashMap<File, String> contents = new HashMap<File, String>(4);
        Collection filesInBucket = FileUtils.listFiles((File)directory, null, (boolean)true);
        for (File file : filesInBucket) {
            contents.put(file, FileUtils.readFileToString((File)file));
        }
        return contents;
    }

    static class MockListState<T>
    implements ListState<T> {
        private final List<T> backingList = new ArrayList<T>();

        MockListState() {
        }

        public List<T> getBackingList() {
            return this.backingList;
        }

        public void update(List<T> values) {
            this.backingList.clear();
            this.addAll(values);
        }

        public void addAll(List<T> values) {
            this.backingList.addAll(values);
        }

        public Iterable<T> get() {
            return new Iterable<T>(){

                @Override
                @Nonnull
                public Iterator<T> iterator() {
                    return backingList.iterator();
                }
            };
        }

        public void add(T value) {
            this.backingList.add(value);
        }

        public void clear() {
            this.backingList.clear();
        }
    }

    static class MockSinkContext
    implements SinkFunction.Context {
        @Nullable
        private Long elementTimestamp;
        private long watermark;
        private long processingTime;

        MockSinkContext(@Nullable Long elementTimestamp, long watermark, long processingTime) {
            this.elementTimestamp = elementTimestamp;
            this.watermark = watermark;
            this.processingTime = processingTime;
        }

        public long currentProcessingTime() {
            return this.processingTime;
        }

        public long currentWatermark() {
            return this.watermark;
        }

        @Nullable
        public Long timestamp() {
            return this.elementTimestamp;
        }
    }

    static class StringIdentityBucketAssigner
    implements BucketAssigner<String, String> {
        private static final long serialVersionUID = 1L;

        StringIdentityBucketAssigner() {
        }

        public String getBucketId(String element, BucketAssigner.Context context) {
            return element;
        }

        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
    }

    static class TupleToStringBucketer
    implements BucketAssigner<Tuple2<String, Integer>, String> {
        private static final long serialVersionUID = 1L;

        TupleToStringBucketer() {
        }

        public String getBucketId(Tuple2<String, Integer> element, BucketAssigner.Context context) {
            return (String)element.f0;
        }

        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
    }
}

