package com.microsoft.azure.kusto;

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
import com.microsoft.azure.flink.writer.internal.committer.KustoCommitter;
import com.microsoft.azure.flink.writer.internal.sink.KustoGenericWriteAheadSink;
import com.microsoft.azure.flink.writer.internal.sink.KustoSink;
import java.util.UUID;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.util.Preconditions;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:com/microsoft/azure/kusto/KustoWriteSink.class */
public class KustoWriteSink {
    protected static final Logger LOG = LoggerFactory.getLogger(KustoWriteSink.class);
    protected KustoConnectionOptions connectionOptions;
    protected KustoWriteOptions writeOptions;

    private KustoWriteSink() {
    }

    @Contract(" -> new")
    @NotNull
    public static KustoWriteSink builder() {
        return new KustoWriteSink();
    }

    public KustoWriteSink setConnectionOptions(KustoConnectionOptions kustoConnectionOptions) {
        if (kustoConnectionOptions == null) {
            throw new IllegalArgumentException("Connection options cannot be null. Please use KustoConnectionOptions.Builder() to create one. ");
        }
        this.connectionOptions = kustoConnectionOptions;
        return this;
    }

    public KustoWriteSink setWriteOptions(KustoWriteOptions kustoWriteOptions) {
        if (kustoWriteOptions == null) {
            throw new IllegalArgumentException("Connection options cannot be null. Please use KustoConnectionOptions.Builder() to create one.");
        }
        this.writeOptions = kustoWriteOptions;
        return this;
    }

    protected void sanityCheck() {
        Preconditions.checkNotNull(this.connectionOptions, "Kusto connection options must be supplied.");
        Preconditions.checkNotNull(this.writeOptions, "Kusto write options must be supplied.");
        Preconditions.checkNotNull(this.writeOptions.getDatabase(), "Kusto write options should have database name specified");
        Preconditions.checkNotNull(this.writeOptions.getTable(), "Kusto write options should have table specified");
    }

    public <IN> void build(@NotNull DataStream<IN> dataStream) throws Exception {
        build(dataStream, 1);
    }

    public <IN> void build(@NotNull DataStream<IN> dataStream, int i) throws Exception {
        sinkDataStream(dataStream, i);
    }

    private <IN> DataStreamSink<IN> sinkDataStream(@NotNull DataStream<IN> dataStream, int i) {
        TypeInformation type = dataStream.getType();
        TypeSerializer createSerializer = type.createSerializer(new SerializerConfigImpl());
        if (!((type instanceof TupleTypeInfo) || (type instanceof RowTypeInfo) || (type instanceof CaseClassTypeInfo) || (type instanceof PojoTypeInfo))) {
            throw new IllegalArgumentException("No support for the type of the given DataStream: " + dataStream.getType());
        }
        sanityCheck();
        LOG.info("Building KustoSink with WriteOptions: {} and ConnectionOptions {}", this.writeOptions.toString(), this.connectionOptions.toString());
        return dataStream.sinkTo(new KustoSink(this.connectionOptions, this.writeOptions, createSerializer, type)).setParallelism(i);
    }

    public <IN> void build(@NotNull DataStream<IN> dataStream, int i, String str, String str2) throws Exception {
        sinkDataStream(dataStream, i).name(str).uid(str2);
    }

    public <IN> void buildWriteAheadSink(@NotNull DataStream<IN> dataStream) throws Exception {
        buildWriteAheadSink(dataStream, 1);
    }

    public <IN> void buildWriteAheadSink(@NotNull DataStream<IN> dataStream, int i) throws Exception {
        TypeInformation type = dataStream.getType();
        TypeSerializer createSerializer = type.createSerializer(new SerializerConfigImpl());
        if (!((type instanceof TupleTypeInfo) || (type instanceof RowTypeInfo) || (type instanceof CaseClassTypeInfo) || (type instanceof PojoTypeInfo))) {
            throw new IllegalArgumentException("No support for the type of the given DataStream: " + dataStream.getType());
        }
        sanityCheck();
        LOG.info("Building GenericWriteAheadSink with WriteOptions: {} and ConnectionOptions {}", this.writeOptions.toString(), this.connectionOptions.toString());
        dataStream.transform(String.format("KustoGenericWriteAheadSink-%s-%s", this.writeOptions.getDatabase(), this.writeOptions.getTable()), type, new KustoGenericWriteAheadSink(this.connectionOptions, this.writeOptions, new KustoCommitter(this.connectionOptions, this.writeOptions), createSerializer, type, UUID.randomUUID().toString())).setParallelism(i);
    }
}
