package org.apache.beam.sdk.io.kafka;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.expansion.service.ExpansionService;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.powermock.reflect.Whitebox;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.class */
public class KafkaIOExternalTest {

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOExternalTest$TestStreamObserver.class */
    private static class TestStreamObserver<T> implements StreamObserver<T> {
        private T result;

        private TestStreamObserver() {
        }

        public void onNext(T t) {
            this.result = t;
        }

        public void onError(Throwable th) {
            throw new RuntimeException("Should not happen", th);
        }

        public void onCompleted() {
        }
    }

    @Test
    public void testConstructKafkaRead() throws Exception {
        ImmutableList of = ImmutableList.of("topic1", "topic2");
        ImmutableMap build = ImmutableMap.builder().put("bootstrap.servers", "server1:port,server2:port").put("key2", "value2").put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer").put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer").build();
        ExpansionApi.ExpansionRequest build2 = ExpansionApi.ExpansionRequest.newBuilder().setComponents(RunnerApi.Components.getDefaultInstance()).setTransform(RunnerApi.PTransform.newBuilder().setUniqueName("test").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:external:java:kafka:read:v1").setPayload(ExternalTransforms.ExternalConfigurationPayload.newBuilder().putConfiguration("topics", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:iterable:v1").addCoderUrn("beam:coder:string_utf8:v1").setPayload(ByteString.copyFrom(listAsBytes(of))).build()).putConfiguration("consumer_config", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:iterable:v1").addCoderUrn("beam:coder:kv:v1").addCoderUrn("beam:coder:string_utf8:v1").addCoderUrn("beam:coder:string_utf8:v1").setPayload(ByteString.copyFrom(mapAsBytes(build))).build()).putConfiguration("key_deserializer", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:string_utf8:v1").setPayload(ByteString.copyFrom(encodeString("org.apache.kafka.common.serialization.ByteArrayDeserializer"))).build()).putConfiguration("value_deserializer", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:string_utf8:v1").setPayload(ByteString.copyFrom(encodeString("org.apache.kafka.common.serialization.LongDeserializer"))).build()).build().toByteString()))).setNamespace("test_namespace").build();
        ExpansionService expansionService = new ExpansionService();
        TestStreamObserver testStreamObserver = new TestStreamObserver();
        expansionService.expand(build2, testStreamObserver);
        ExpansionApi.ExpansionResponse expansionResponse = (ExpansionApi.ExpansionResponse) testStreamObserver.result;
        RunnerApi.PTransform transform = expansionResponse.getTransform();
        MatcherAssert.assertThat(transform.getSubtransformsList(), Matchers.contains(new String[]{"test_namespacetest/KafkaIO.Read", "test_namespacetest/Remove Kafka Metadata"}));
        MatcherAssert.assertThat(Integer.valueOf(transform.getInputsCount()), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(transform.getOutputsCount()), Matchers.is(1));
        KafkaIO.Read spec = ReadTranslation.unboundedSourceFromProto(RunnerApi.ReadPayload.parseFrom(expansionResponse.getComponents().getTransformsOrThrow(expansionResponse.getComponents().getTransformsOrThrow(transform.getSubtransforms(0)).getSubtransforms(0)).getSpec().getPayload())).getSpec();
        MatcherAssert.assertThat(spec.getConsumerConfig(), Matchers.is(build));
        MatcherAssert.assertThat(spec.getTopics(), Matchers.is(of));
        MatcherAssert.assertThat(spec.getKeyDeserializerProvider().getDeserializer(spec.getConsumerConfig(), true).getClass().getName(), Matchers.is("org.apache.kafka.common.serialization.ByteArrayDeserializer"));
        MatcherAssert.assertThat(spec.getValueDeserializerProvider().getDeserializer(spec.getConsumerConfig(), false).getClass().getName(), Matchers.is("org.apache.kafka.common.serialization.LongDeserializer"));
    }

    @Test
    public void testConstructKafkaWrite() throws Exception {
        ImmutableMap build = ImmutableMap.builder().put("bootstrap.servers", "server1:port,server2:port").put("retries", "3").build();
        ExternalTransforms.ExternalConfigurationPayload build2 = ExternalTransforms.ExternalConfigurationPayload.newBuilder().putConfiguration("topic", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:string_utf8:v1").setPayload(ByteString.copyFrom(encodeString("topic"))).build()).putConfiguration("producer_config", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:iterable:v1").addCoderUrn("beam:coder:kv:v1").addCoderUrn("beam:coder:string_utf8:v1").addCoderUrn("beam:coder:string_utf8:v1").setPayload(ByteString.copyFrom(mapAsBytes(build))).build()).putConfiguration("key_serializer", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:string_utf8:v1").setPayload(ByteString.copyFrom(encodeString("org.apache.kafka.common.serialization.ByteArraySerializer"))).build()).putConfiguration("value_serializer", ExternalTransforms.ConfigValue.newBuilder().addCoderUrn("beam:coder:string_utf8:v1").setPayload(ByteString.copyFrom(encodeString("org.apache.kafka.common.serialization.LongSerializer"))).build()).build();
        Pipeline create = Pipeline.create();
        create.apply(Impulse.create()).apply(WithKeys.of("key"));
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(create);
        ExpansionApi.ExpansionRequest build3 = ExpansionApi.ExpansionRequest.newBuilder().setComponents(proto.getComponents()).setTransform(RunnerApi.PTransform.newBuilder().setUniqueName("test").putInputs("input", (String) Iterables.getOnlyElement(((RunnerApi.PTransform) Iterables.getLast(proto.getComponents().getTransformsMap().values())).getOutputsMap().values())).setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:external:java:kafka:write:v1").setPayload(build2.toByteString()))).setNamespace("test_namespace").build();
        ExpansionService expansionService = new ExpansionService();
        TestStreamObserver testStreamObserver = new TestStreamObserver();
        expansionService.expand(build3, testStreamObserver);
        ExpansionApi.ExpansionResponse expansionResponse = (ExpansionApi.ExpansionResponse) testStreamObserver.result;
        RunnerApi.PTransform transform = expansionResponse.getTransform();
        MatcherAssert.assertThat(transform.getSubtransformsList(), Matchers.contains(new String[]{"test_namespacetest/Kafka ProducerRecord", "test_namespacetest/KafkaIO.WriteRecords"}));
        MatcherAssert.assertThat(Integer.valueOf(transform.getInputsCount()), Matchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(transform.getOutputsCount()), Matchers.is(0));
        DoFn doFn = ParDoTranslation.getDoFn(RunnerApi.ParDoPayload.parseFrom(expansionResponse.getComponents().getTransformsOrThrow(expansionResponse.getComponents().getTransformsOrThrow(expansionResponse.getComponents().getTransformsOrThrow(transform.getSubtransforms(1)).getSubtransforms(0)).getSubtransforms(0)).getSpec().getPayload()));
        MatcherAssert.assertThat(doFn, Matchers.instanceOf(KafkaWriter.class));
        KafkaIO.WriteRecords writeRecords = (KafkaIO.WriteRecords) Whitebox.getInternalState(doFn, "spec");
        MatcherAssert.assertThat(writeRecords.getProducerConfig(), Matchers.is(build));
        MatcherAssert.assertThat(writeRecords.getTopic(), Matchers.is("topic"));
        MatcherAssert.assertThat(writeRecords.getKeySerializer().getName(), Matchers.is("org.apache.kafka.common.serialization.ByteArraySerializer"));
        MatcherAssert.assertThat(writeRecords.getValueSerializer().getName(), Matchers.is("org.apache.kafka.common.serialization.LongSerializer"));
    }

    private static byte[] listAsBytes(List<String> list) throws IOException {
        IterableCoder of = IterableCoder.of(StringUtf8Coder.of());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        of.encode(list, byteArrayOutputStream);
        return byteArrayOutputStream.toByteArray();
    }

    private static byte[] mapAsBytes(Map<String, String> map) throws IOException {
        IterableCoder of = IterableCoder.of(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
        List list = (List) map.entrySet().stream().map(entry -> {
            return KV.of((String) entry.getKey(), (String) entry.getValue());
        }).collect(Collectors.toList());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        of.encode(list, byteArrayOutputStream);
        return byteArrayOutputStream.toByteArray();
    }

    private static byte[] encodeString(String str) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        StringUtf8Coder.of().encode(str, byteArrayOutputStream);
        return byteArrayOutputStream.toByteArray();
    }
}
