package org.apache.flink.connector.firehose.sink;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletionException;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.firehose.model.Record;

/* loaded from: input_file:org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.class */
public class KinesisFirehoseSinkWriterTest {
    private KinesisFirehoseSinkWriter<String> sinkWriter;
    private static final ElementConverter<String, Record> ELEMENT_CONVERTER_PLACEHOLDER = KinesisFirehoseSinkElementConverter.builder().setSerializationSchema(new SimpleStringSchema()).build();

    @BeforeEach
    void setup() {
        this.sinkWriter = new KinesisFirehoseSinkWriter<>(ELEMENT_CONVERTER_PLACEHOLDER, new TestSinkInitContext(), 50, 16, 10000, 4194304L, 5000L, 1024000L, true, "streamName", AWSServicesTestUtils.createConfig("https://fake_aws_endpoint"));
    }

    @Test
    void getSizeInBytesReturnsSizeOfBlobBeforeBase64Encoding() {
        Assertions.assertThat(this.sinkWriter.getSizeInBytes((Record) Record.builder().data(SdkBytes.fromUtf8String("{many hands make light work;")).build())).isEqualTo("{many hands make light work;".getBytes(StandardCharsets.US_ASCII).length);
    }

    @Test
    void getNumRecordsOutErrorsCounterRecordsCorrectNumberOfFailures() throws IOException, InterruptedException {
        TestSinkInitContext testSinkInitContext = new TestSinkInitContext();
        StatefulSink.StatefulSinkWriter createWriter = new KinesisFirehoseSink(ELEMENT_CONVERTER_PLACEHOLDER, 12, 16, 10000, 4194304L, 5000L, 1024000L, true, "test-stream", AWSServicesTestUtils.createConfig("https://localhost")).createWriter(testSinkInitContext);
        for (int i = 0; i < 12; i++) {
            createWriter.write("data_bytes", (SinkWriter.Context) null);
        }
        Assertions.assertThatExceptionOfType(CompletionException.class).isThrownBy(() -> {
            createWriter.flush(true);
        }).withCauseInstanceOf(SdkClientException.class).withMessageContaining("Unable to execute HTTP request: Connection refused: localhost/127.0.0.1:443");
        Assertions.assertThat(testSinkInitContext.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(12L);
        Assertions.assertThat(testSinkInitContext.metricGroup().getNumRecordsSendErrorsCounter().getCount()).isEqualTo(12L);
    }
}
