package com.microsoft.azure.flink.writer.internal.sink;

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
import org.jetbrains.annotations.NotNull;

@PublicEvolving
@Internal
/* loaded from: input_file:com/microsoft/azure/flink/writer/internal/sink/KustoGenericWriteAheadSink.class */
public class KustoGenericWriteAheadSink<IN> extends GenericWriteAheadSink<IN> {
    private final KustoConnectionOptions connectionOptions;
    private final KustoWriteOptions writeOptions;
    private KustoSinkCommon<IN> kustoSinkCommon;
    private final TypeSerializer<IN> serializer;
    private final TypeInformation<IN> typeInformation;

    public KustoGenericWriteAheadSink(KustoConnectionOptions kustoConnectionOptions, KustoWriteOptions kustoWriteOptions, CheckpointCommitter checkpointCommitter, TypeSerializer<IN> typeSerializer, TypeInformation<IN> typeInformation, String str) throws Exception {
        super(checkpointCommitter, typeSerializer, str);
        this.connectionOptions = kustoConnectionOptions;
        this.writeOptions = kustoWriteOptions;
        this.serializer = typeSerializer;
        this.typeInformation = typeInformation;
    }

    public void open() throws Exception {
        super.open();
        if (!getRuntimeContext().isCheckpointingEnabled()) {
            throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled.");
        }
        this.kustoSinkCommon = new KustoSinkCommon<>(this.connectionOptions, this.writeOptions, super.getRuntimeContext().getMetricGroup(), this.serializer, this.typeInformation, KustoGenericWriteAheadSink.class.getSimpleName());
    }

    protected boolean sendValues(@NotNull Iterable<IN> iterable, long j, long j2) throws Exception {
        return this.kustoSinkCommon.ingest(iterable);
    }

    public void close() throws Exception {
        super.close();
        this.kustoSinkCommon.close();
    }
}
