/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.filesystem.stream;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.filesystem.FileSystemFactory;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.filesystem.TableMetaStoreFactory;
import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
import org.apache.flink.table.filesystem.stream.PartitionCommitter;
import org.apache.flink.table.filesystem.stream.StreamingFileWriter;
import org.apache.flink.table.filesystem.stream.compact.CompactBucketWriter;
import org.apache.flink.table.filesystem.stream.compact.CompactCoordinator;
import org.apache.flink.table.filesystem.stream.compact.CompactFileWriter;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages;
import org.apache.flink.table.filesystem.stream.compact.CompactOperator;
import org.apache.flink.table.filesystem.stream.compact.CompactReader;
import org.apache.flink.table.filesystem.stream.compact.CompactWriter;
import org.apache.flink.util.function.SupplierWithException;

public class StreamingSink {
    private StreamingSink() {
    }

    public static <T> DataStream<PartitionCommitInfo> writer(DataStream<T> inputStream, long bucketCheckInterval, StreamingFileSink.BucketsBuilder<T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder, int parallelism) {
        StreamingFileWriter<T> fileWriter = new StreamingFileWriter<T>(bucketCheckInterval, bucketsBuilder);
        return inputStream.transform(StreamingFileWriter.class.getSimpleName(), TypeInformation.of(PartitionCommitInfo.class), fileWriter).setParallelism(parallelism);
    }

    public static <T> DataStream<PartitionCommitInfo> compactionWriter(DataStream<T> inputStream, long bucketCheckInterval, StreamingFileSink.BucketsBuilder<T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder, FileSystemFactory fsFactory, Path path, CompactReader.Factory<T> readFactory, long targetFileSize, int parallelism) {
        CompactFileWriter<T> writer = new CompactFileWriter<T>(bucketCheckInterval, bucketsBuilder);
        SupplierWithException fsSupplier = (SupplierWithException & Serializable)() -> fsFactory.create(path.toUri());
        CompactCoordinator coordinator = new CompactCoordinator((SupplierWithException<FileSystem, IOException>)fsSupplier, targetFileSize);
        SingleOutputStreamOperator coordinatorOp = inputStream.transform("streaming-writer", TypeInformation.of(CompactMessages.CoordinatorInput.class), writer).setParallelism(parallelism).transform("compact-coordinator", TypeInformation.of(CompactMessages.CoordinatorOutput.class), (OneInputStreamOperator)coordinator).setParallelism(1).setMaxParallelism(1);
        CompactWriter.Factory writerFactory = CompactBucketWriter.factory(() -> bucketsBuilder.createBucketWriter());
        CompactOperator<T> compacter = new CompactOperator<T>((SupplierWithException<FileSystem, IOException>)fsSupplier, readFactory, writerFactory);
        return coordinatorOp.broadcast().transform("compact-operator", TypeInformation.of(PartitionCommitInfo.class), compacter).setParallelism(parallelism);
    }

    public static DataStreamSink<?> sink(DataStream<PartitionCommitInfo> writer, Path locationPath, ObjectIdentifier identifier, List<String> partitionKeys, TableMetaStoreFactory msFactory, FileSystemFactory fsFactory, Configuration options) {
        SingleOutputStreamOperator stream = writer;
        if (partitionKeys.size() > 0 && options.contains(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND)) {
            PartitionCommitter committer = new PartitionCommitter(locationPath, identifier, partitionKeys, msFactory, fsFactory, options);
            stream = writer.transform(PartitionCommitter.class.getSimpleName(), Types.VOID, (OneInputStreamOperator)committer).setParallelism(1).setMaxParallelism(1);
        }
        return stream.addSink((SinkFunction)new DiscardingSink()).name("end").setParallelism(1);
    }
}

