package org.apache.beam.sdk.io.kafka;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.expansion.service.ExpansionService;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.text.MatchesPattern;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.powermock.reflect.Whitebox;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.class */
public class KafkaIOExternalTest {

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOExternalTest$TestStreamObserver.class */
    private static class TestStreamObserver<T> implements StreamObserver<T> {
        private T result;

        private TestStreamObserver() {
        }

        public void onNext(T t) {
            this.result = t;
        }

        public void onError(Throwable th) {
            throw new RuntimeException("Should not happen", th);
        }

        public void onCompleted() {
        }
    }

    private void verifyKafkaReadComposite(RunnerApi.PTransform pTransform, ExpansionApi.ExpansionResponse expansionResponse) throws Exception {
        MatcherAssert.assertThat(pTransform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*Impulse.*")));
        MatcherAssert.assertThat(pTransform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*GenerateKafkaSourceDescriptor.*")));
        MatcherAssert.assertThat(pTransform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*ReadSourceDescriptors.*")));
        Assert.assertNotNull(RunnerApi.ParDoPayload.parseFrom(expansionResponse.getComponents().getTransformsOrThrow(pTransform.getSubtransforms(2)).getSpec().getPayload()).getRestrictionCoderId());
    }

    @Test
    public void testConstructKafkaRead() throws Exception {
        ExpansionApi.ExpansionRequest build = ExpansionApi.ExpansionRequest.newBuilder().setComponents(RunnerApi.Components.getDefaultInstance()).setTransform(RunnerApi.PTransform.newBuilder().setUniqueName("test").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:org.apache.beam:kafka_read_with_metadata:v1").setPayload(encodeRow(Row.withSchema(Schema.of(new Schema.Field[]{Schema.Field.of("topics", Schema.FieldType.array(Schema.FieldType.STRING)), Schema.Field.of("consumer_config", Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.STRING)), Schema.Field.of("key_deserializer", Schema.FieldType.STRING), Schema.Field.of("value_deserializer", Schema.FieldType.STRING), Schema.Field.of("start_read_time", Schema.FieldType.INT64), Schema.Field.of("commit_offset_in_finalize", Schema.FieldType.BOOLEAN), Schema.Field.of("timestamp_policy", Schema.FieldType.STRING)})).withFieldValue("topics", ImmutableList.of("topic1", "topic2")).withFieldValue("consumer_config", ImmutableMap.builder().put("bootstrap.servers", "server1:port,server2:port").put("key2", "value2").put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer").put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer").build()).withFieldValue("key_deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer").withFieldValue("value_deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer").withFieldValue("start_read_time", 100L).withFieldValue("commit_offset_in_finalize", false).withFieldValue("timestamp_policy", "ProcessingTime").build()).toByteString()))).setNamespace("test_namespace").build();
        ExpansionService expansionService = new ExpansionService();
        TestStreamObserver testStreamObserver = new TestStreamObserver();
        expansionService.expand(build, testStreamObserver);
        ExpansionApi.ExpansionResponse expansionResponse = (ExpansionApi.ExpansionResponse) testStreamObserver.result;
        RunnerApi.PTransform transform = expansionResponse.getTransform();
        MatcherAssert.assertThat(transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
        MatcherAssert.assertThat(transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*Convert-to-ExternalKafkaRecord.*")));
        MatcherAssert.assertThat(transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*Convert-ConvertTransform.*")));
        MatcherAssert.assertThat(Integer.valueOf(transform.getInputsCount()), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(transform.getOutputsCount()), Matchers.is(1));
        verifyKafkaReadComposite(expansionResponse.getComponents().getTransformsOrThrow(expansionResponse.getComponents().getTransformsOrThrow(transform.getSubtransforms(0)).getSubtransforms(0)), expansionResponse);
    }

    @Test
    public void testKafkaRecordToExternalKafkaRecord() throws Exception {
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add("dummyHeaderKey", "dummyHeaderVal".getBytes(StandardCharsets.UTF_8));
        KafkaIO.ByteArrayKafkaRecord externalKafkaRecord = KafkaIO.RowsWithMetadata.toExternalKafkaRecord(new KafkaRecord("dummyTopic", 111, 222L, 12345L, KafkaTimestampType.LOG_APPEND_TIME, recordHeaders, "dummyKey".getBytes(StandardCharsets.UTF_8), "dummyValue".getBytes(StandardCharsets.UTF_8)));
        Assert.assertEquals("dummyTopic", externalKafkaRecord.topic);
        Assert.assertEquals(111L, externalKafkaRecord.partition);
        Assert.assertEquals(222L, externalKafkaRecord.offset);
        Assert.assertEquals(12345L, externalKafkaRecord.timestamp);
        Assert.assertEquals(KafkaTimestampType.LOG_APPEND_TIME.id, externalKafkaRecord.timestampTypeId);
        Assert.assertEquals(KafkaTimestampType.LOG_APPEND_TIME.name, externalKafkaRecord.timestampTypeName);
        Assert.assertEquals("dummyKey", new String(externalKafkaRecord.key, "UTF-8"));
        Assert.assertEquals("dummyValue", new String(externalKafkaRecord.value, "UTF-8"));
        Assert.assertEquals(1L, externalKafkaRecord.headers.size());
        Assert.assertEquals("dummyHeaderKey", ((KafkaIO.KafkaHeader) externalKafkaRecord.headers.get(0)).key);
        Assert.assertEquals("dummyHeaderVal", new String(((KafkaIO.KafkaHeader) externalKafkaRecord.headers.get(0)).value, "UTF-8"));
    }

    @Test
    public void testKafkaRecordToExternalKafkaRecordWithNullKeyAndValue() throws Exception {
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add("dummyHeaderKey", "dummyHeaderVal".getBytes(StandardCharsets.UTF_8));
        KafkaIO.ByteArrayKafkaRecord externalKafkaRecord = KafkaIO.RowsWithMetadata.toExternalKafkaRecord(new KafkaRecord("dummyTopic", 111, 222L, 12345L, KafkaTimestampType.LOG_APPEND_TIME, recordHeaders, (Object) null, (Object) null));
        Assert.assertEquals("dummyTopic", externalKafkaRecord.topic);
        Assert.assertEquals(111L, externalKafkaRecord.partition);
        Assert.assertEquals(222L, externalKafkaRecord.offset);
        Assert.assertEquals(12345L, externalKafkaRecord.timestamp);
        Assert.assertEquals(KafkaTimestampType.LOG_APPEND_TIME.id, externalKafkaRecord.timestampTypeId);
        Assert.assertEquals(KafkaTimestampType.LOG_APPEND_TIME.name, externalKafkaRecord.timestampTypeName);
        Assert.assertNull(externalKafkaRecord.key);
        Assert.assertNull(externalKafkaRecord.value);
        Assert.assertEquals(1L, externalKafkaRecord.headers.size());
        Assert.assertEquals("dummyHeaderKey", ((KafkaIO.KafkaHeader) externalKafkaRecord.headers.get(0)).key);
        Assert.assertEquals("dummyHeaderVal", new String(((KafkaIO.KafkaHeader) externalKafkaRecord.headers.get(0)).value, "UTF-8"));
    }

    @Test
    public void testConstructKafkaReadWithoutMetadata() throws Exception {
        ExpansionApi.ExpansionRequest build = ExpansionApi.ExpansionRequest.newBuilder().setComponents(RunnerApi.Components.getDefaultInstance()).setTransform(RunnerApi.PTransform.newBuilder().setUniqueName("test").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:org.apache.beam:kafka_read_without_metadata:v1").setPayload(encodeRow(Row.withSchema(Schema.of(new Schema.Field[]{Schema.Field.of("topics", Schema.FieldType.array(Schema.FieldType.STRING)), Schema.Field.of("consumer_config", Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.STRING)), Schema.Field.of("key_deserializer", Schema.FieldType.STRING), Schema.Field.of("value_deserializer", Schema.FieldType.STRING), Schema.Field.of("start_read_time", Schema.FieldType.INT64), Schema.Field.of("commit_offset_in_finalize", Schema.FieldType.BOOLEAN), Schema.Field.of("timestamp_policy", Schema.FieldType.STRING)})).withFieldValue("topics", ImmutableList.of("topic1", "topic2")).withFieldValue("consumer_config", ImmutableMap.builder().put("bootstrap.servers", "server1:port,server2:port").put("key2", "value2").put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer").put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer").build()).withFieldValue("key_deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer").withFieldValue("value_deserializer", "org.apache.kafka.common.serialization.LongDeserializer").withFieldValue("start_read_time", 100L).withFieldValue("commit_offset_in_finalize", false).withFieldValue("timestamp_policy", "ProcessingTime").build()).toByteString()))).setNamespace("test_namespace").build();
        ExpansionService expansionService = new ExpansionService();
        TestStreamObserver testStreamObserver = new TestStreamObserver();
        expansionService.expand(build, testStreamObserver);
        ExpansionApi.ExpansionResponse expansionResponse = (ExpansionApi.ExpansionResponse) testStreamObserver.result;
        RunnerApi.PTransform transform = expansionResponse.getTransform();
        MatcherAssert.assertThat(transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
        MatcherAssert.assertThat(transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*Remove-Kafka-Metadata.*")));
        MatcherAssert.assertThat(Integer.valueOf(transform.getInputsCount()), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(transform.getOutputsCount()), Matchers.is(1));
        RunnerApi.PTransform transformsOrThrow = expansionResponse.getComponents().getTransformsOrThrow(transform.getSubtransforms(0));
        expansionResponse.getComponents().getTransformsOrThrow(transformsOrThrow.getSubtransforms(0));
        verifyKafkaReadComposite(expansionResponse.getComponents().getTransformsOrThrow(transformsOrThrow.getSubtransforms(0)), expansionResponse);
    }

    @Test
    public void testConstructKafkaWrite() throws Exception {
        ImmutableMap build = ImmutableMap.builder().put("bootstrap.servers", "server1:port,server2:port").put("retries", "3").build();
        ExternalTransforms.ExternalConfigurationPayload encodeRow = encodeRow(Row.withSchema(Schema.of(new Schema.Field[]{Schema.Field.of("topic", Schema.FieldType.STRING), Schema.Field.of("producer_config", Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.STRING)), Schema.Field.of("key_serializer", Schema.FieldType.STRING), Schema.Field.of("value_serializer", Schema.FieldType.STRING)})).withFieldValue("topic", "topic").withFieldValue("producer_config", build).withFieldValue("key_serializer", "org.apache.kafka.common.serialization.ByteArraySerializer").withFieldValue("value_serializer", "org.apache.kafka.common.serialization.LongSerializer").build());
        Pipeline create = Pipeline.create();
        create.apply(Impulse.create()).apply(WithKeys.of("key"));
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(create);
        ExpansionApi.ExpansionRequest build2 = ExpansionApi.ExpansionRequest.newBuilder().setComponents(proto.getComponents()).setTransform(RunnerApi.PTransform.newBuilder().setUniqueName("test").putInputs("input", (String) Iterables.getOnlyElement(((RunnerApi.PTransform) Iterables.getLast(proto.getComponents().getTransformsMap().values())).getOutputsMap().values())).setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:org.apache.beam:kafka_write:v1").setPayload(encodeRow.toByteString()))).setNamespace("test_namespace").build();
        ExpansionService expansionService = new ExpansionService();
        TestStreamObserver testStreamObserver = new TestStreamObserver();
        expansionService.expand(build2, testStreamObserver);
        ExpansionApi.ExpansionResponse expansionResponse = (ExpansionApi.ExpansionResponse) testStreamObserver.result;
        RunnerApi.PTransform transform = expansionResponse.getTransform();
        MatcherAssert.assertThat(transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*Kafka-ProducerRecord.*")));
        MatcherAssert.assertThat(transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-WriteRecords.*")));
        MatcherAssert.assertThat(Integer.valueOf(transform.getInputsCount()), Matchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(transform.getOutputsCount()), Matchers.is(0));
        DoFn doFn = ParDoTranslation.getDoFn(RunnerApi.ParDoPayload.parseFrom(expansionResponse.getComponents().getTransformsOrThrow(expansionResponse.getComponents().getTransformsOrThrow(expansionResponse.getComponents().getTransformsOrThrow(transform.getSubtransforms(1)).getSubtransforms(0)).getSubtransforms(0)).getSpec().getPayload()));
        MatcherAssert.assertThat(doFn, Matchers.instanceOf(KafkaWriter.class));
        KafkaIO.WriteRecords writeRecords = (KafkaIO.WriteRecords) Whitebox.getInternalState(doFn, "spec");
        MatcherAssert.assertThat(writeRecords.getProducerConfig(), Matchers.is(build));
        MatcherAssert.assertThat(writeRecords.getTopic(), Matchers.is("topic"));
        MatcherAssert.assertThat(writeRecords.getKeySerializer().getName(), Matchers.is("org.apache.kafka.common.serialization.ByteArraySerializer"));
        MatcherAssert.assertThat(writeRecords.getValueSerializer().getName(), Matchers.is("org.apache.kafka.common.serialization.LongSerializer"));
    }

    private static ExternalTransforms.ExternalConfigurationPayload encodeRow(Row row) {
        ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
        try {
            SchemaCoder.of(row.getSchema()).encode(row, byteStringOutputStream);
            return ExternalTransforms.ExternalConfigurationPayload.newBuilder().setSchema(SchemaTranslation.schemaToProto(row.getSchema(), true)).setPayload(byteStringOutputStream.toByteString()).build();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
