package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import com.alibaba.fastjson.JSON;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.protobuf.PayloadMessages;
import org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable;
import org.apache.beam.sdk.io.thrift.ThriftCoder;
import org.apache.beam.sdk.io.thrift.ThriftSchema;
import org.apache.beam.sdk.io.thrift.payloads.ItThriftMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.RowMessages;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.commons.csv.CSVFormat;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.class */
public class KafkaTableProviderIT {

    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    private static KafkaOptions kafkaOptions;

    @Parameterized.Parameter
    public KafkaObjectProvider objectsProvider;

    @Parameterized.Parameter(1)
    public String topic;
    private static final String KAFKA_CONTAINER_VERSION = "5.5.2";

    @ClassRule
    public static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag(KAFKA_CONTAINER_VERSION));
    private static final Schema TEST_TABLE_SCHEMA = Schema.builder().addInt64Field("f_long").addInt32Field("f_int").addStringField("f_string").build();
    static final transient Map<Long, Boolean> FLAG = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT$FakeKvPair.class */
    public static class FakeKvPair extends DoFn<Row, KV<String, Row>> {
        @DoFn.ProcessElement
        public void processElement(DoFn<Row, KV<String, Row>>.ProcessContext processContext) {
            processContext.output(KV.of("fake_key", (Row) processContext.element()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT$KafkaAvroObjectProvider.class */
    private static class KafkaAvroObjectProvider extends KafkaObjectProvider {
        private final SimpleFunction<Row, byte[]> toBytesFn;

        private KafkaAvroObjectProvider() {
            super();
            this.toBytesFn = AvroUtils.getRowToAvroBytesFunction(KafkaTableProviderIT.TEST_TABLE_SCHEMA);
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProviderIT.KafkaObjectProvider
        protected ProducerRecord<String, byte[]> generateProducerRecord(int i) {
            return new ProducerRecord<>(KafkaTableProviderIT.kafkaOptions.getKafkaTopic(), "k" + i, (byte[]) this.toBytesFn.apply(KafkaTableProviderIT.generateRow(i)));
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProviderIT.KafkaObjectProvider
        protected String getPayloadFormat() {
            return "avro";
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT$KafkaCsvObjectProvider.class */
    private static class KafkaCsvObjectProvider extends KafkaObjectProvider {
        private KafkaCsvObjectProvider() {
            super();
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProviderIT.KafkaObjectProvider
        protected ProducerRecord<String, byte[]> generateProducerRecord(int i) {
            return new ProducerRecord<>(KafkaTableProviderIT.kafkaOptions.getKafkaTopic(), "k" + i, BeamTableUtils.beamRow2CsvLine(KafkaTableProviderIT.generateRow(i), CSVFormat.DEFAULT).getBytes(StandardCharsets.UTF_8));
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProviderIT.KafkaObjectProvider
        protected String getPayloadFormat() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT$KafkaJsonObjectProvider.class */
    private static class KafkaJsonObjectProvider extends KafkaObjectProvider {
        private KafkaJsonObjectProvider() {
            super();
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProviderIT.KafkaObjectProvider
        protected ProducerRecord<String, byte[]> generateProducerRecord(int i) {
            return new ProducerRecord<>(KafkaTableProviderIT.kafkaOptions.getKafkaTopic(), "k" + i, createJson(i).getBytes(StandardCharsets.UTF_8));
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProviderIT.KafkaObjectProvider
        protected String getPayloadFormat() {
            return "json";
        }

        private String createJson(int i) {
            return String.format("{\"f_long\": %s, \"f_int\": %s, \"f_string\": \"%s\"}", Integer.valueOf(i), Integer.valueOf((i % 3) + 1), "value" + i);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT$KafkaObjectProvider.class */
    private static abstract class KafkaObjectProvider implements Serializable {
        private KafkaObjectProvider() {
        }

        protected abstract ProducerRecord<String, byte[]> generateProducerRecord(int i);

        protected abstract String getPayloadFormat();

        protected String getKafkaPropertiesString() {
            return "{ " + (getPayloadFormat() == null ? "" : "\"format\" : \"" + getPayloadFormat() + "\",") + "}";
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT$KafkaOptions.class */
    public interface KafkaOptions extends PipelineOptions {
        @Default.String("localhost:9092")
        @Description("Kafka server address")
        @Validation.Required
        String getKafkaBootstrapServerAddress();

        void setKafkaBootstrapServerAddress(String str);

        @Default.String("test")
        @Description("Kafka topic")
        @Validation.Required
        String getKafkaTopic();

        void setKafkaTopic(String str);
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT$KafkaProtoObjectProvider.class */
    private static class KafkaProtoObjectProvider extends KafkaObjectProvider {
        private final SimpleFunction<Row, byte[]> toBytesFn;

        private KafkaProtoObjectProvider() {
            super();
            this.toBytesFn = ProtoMessageSchema.getRowToProtoBytesFn(PayloadMessages.ItMessage.class);
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProviderIT.KafkaObjectProvider
        protected ProducerRecord<String, byte[]> generateProducerRecord(int i) {
            return new ProducerRecord<>(KafkaTableProviderIT.kafkaOptions.getKafkaTopic(), "k" + i, (byte[]) this.toBytesFn.apply(KafkaTableProviderIT.generateRow(i)));
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProviderIT.KafkaObjectProvider
        protected String getPayloadFormat() {
            return "proto";
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProviderIT.KafkaObjectProvider
        protected String getKafkaPropertiesString() {
            return "{ \"format\" : \"proto\",\"protoClass\": \"" + PayloadMessages.ItMessage.class.getName() + "\"}";
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT$KafkaThriftObjectProvider.class */
    private static class KafkaThriftObjectProvider extends KafkaObjectProvider {
        private final Class<ItThriftMessage> thriftClass;
        private final TProtocolFactory protocolFactory;
        private final SimpleFunction<Row, byte[]> toBytesFn;

        private KafkaThriftObjectProvider() {
            super();
            this.thriftClass = ItThriftMessage.class;
            this.protocolFactory = new TBinaryProtocol.Factory();
            this.toBytesFn = RowMessages.rowToBytesFn(ThriftSchema.provider(), TypeDescriptor.of(this.thriftClass), ThriftCoder.of(this.thriftClass, this.protocolFactory));
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProviderIT.KafkaObjectProvider
        protected ProducerRecord<String, byte[]> generateProducerRecord(int i) {
            return new ProducerRecord<>(KafkaTableProviderIT.kafkaOptions.getKafkaTopic(), "k" + i, (byte[]) this.toBytesFn.apply(KafkaTableProviderIT.generateRow(i)));
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProviderIT.KafkaObjectProvider
        protected String getKafkaPropertiesString() {
            return "{ \"format\" : \"thrift\",\"thriftClass\": \"" + this.thriftClass.getName() + "\", \"thriftProtocolFactoryClass\": \"" + this.protocolFactory.getClass().getName() + "\"}";
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProviderIT.KafkaObjectProvider
        protected String getPayloadFormat() {
            return "thrift";
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT$StreamAssertEqual.class */
    public static class StreamAssertEqual extends DoFn<KV<String, Row>, Void> {
        private final Set<Row> expected;

        @DoFn.StateId("seenValues")
        private final StateSpec<BagState<Row>> seenRows = StateSpecs.bag(RowCoder.of(KafkaTableProviderIT.TEST_TABLE_SCHEMA));

        @DoFn.StateId("count")
        private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

        StreamAssertEqual(Set<Row> set) {
            this.expected = set;
        }

        @DoFn.ProcessElement
        public void process(DoFn<KV<String, Row>, Void>.ProcessContext processContext, @DoFn.StateId("seenValues") BagState<Row> bagState, @DoFn.StateId("count") ValueState<Integer> valueState) {
            int intValue = ((Integer) MoreObjects.firstNonNull((Integer) valueState.read(), 0)).intValue() + 1;
            valueState.write(Integer.valueOf(intValue));
            bagState.add((Row) ((KV) processContext.element()).getValue());
            if (intValue < this.expected.size() || !((Set) StreamSupport.stream(bagState.read().spliterator(), false).collect(Collectors.toSet())).containsAll(this.expected)) {
                return;
            }
            System.out.println("in second if");
            KafkaTableProviderIT.FLAG.put(Long.valueOf(processContext.getPipelineOptions().getOptionsId()), true);
        }
    }

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{new KafkaJsonObjectProvider(), "json_topic"}, new Object[]{new KafkaAvroObjectProvider(), "avro_topic"}, new Object[]{new KafkaProtoObjectProvider(), "proto_topic"}, new Object[]{new KafkaCsvObjectProvider(), "csv_topic"}, new Object[]{new KafkaThriftObjectProvider(), "thrift_topic"});
    }

    @Before
    public void setUp() {
        kafkaOptions = (KafkaOptions) this.pipeline.getOptions().as(KafkaOptions.class);
        kafkaOptions.setKafkaTopic(this.topic);
        kafkaOptions.setKafkaBootstrapServerAddress(KAFKA_CONTAINER.getBootstrapServers());
        Preconditions.checkArgument(!KAFKA_CONTAINER.getBootstrapServers().contains(","), "This integration test expects exactly one bootstrap server.");
    }

    private static String buildLocation() {
        return kafkaOptions.getKafkaBootstrapServerAddress() + "/" + kafkaOptions.getKafkaTopic();
    }

    @Test
    public void testFake2() throws BeamKafkaTable.NoEstimationException {
        BeamKafkaTable buildBeamSqlTable = new KafkaTableProvider().buildBeamSqlTable(Table.builder().name("kafka_table").comment("kafka table").location(buildLocation()).schema(TEST_TABLE_SCHEMA).type("kafka").properties(JSON.parseObject(this.objectsProvider.getKafkaPropertiesString())).build());
        produceSomeRecordsWithDelay(100, 20);
        double computeRate = buildBeamSqlTable.computeRate(20);
        produceSomeRecordsWithDelay(100, 10);
        Assert.assertTrue(buildBeamSqlTable.computeRate(20) > computeRate);
    }

    @Test
    public void testFake() throws InterruptedException {
        this.pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
        String format = String.format("CREATE EXTERNAL TABLE kafka_table(\nf_long BIGINT NOT NULL, \nf_int INTEGER NOT NULL, \nf_string VARCHAR NOT NULL \n) \nTYPE 'kafka' \nLOCATION '%s'\nTBLPROPERTIES '%s'", buildLocation(), this.objectsProvider.getKafkaPropertiesString());
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new KafkaTableProvider()});
        inMemory.executeDdl(format);
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, inMemory.parseQuery("SELECT * FROM kafka_table"));
        pCollection.apply(ParDo.of(new FakeKvPair())).setCoder(KvCoder.of(StringUtf8Coder.of(), RowCoder.of(TEST_TABLE_SCHEMA))).apply("waitForSuccess", ParDo.of(new StreamAssertEqual(ImmutableSet.of(generateRow(0), generateRow(1), generateRow(2)))));
        pCollection.apply(logRecords(""));
        this.pipeline.run();
        TimeUnit.SECONDS.sleep(4L);
        produceSomeRecords(3);
        for (int i = 0; i < 200; i++) {
            if (FLAG.getOrDefault(Long.valueOf(this.pipeline.getOptions().getOptionsId()), false).booleanValue()) {
                return;
            }
            TimeUnit.MILLISECONDS.sleep(90L);
        }
        Assert.fail();
    }

    @Test
    public void testFakeNested() throws InterruptedException {
        Assume.assumeFalse(this.topic.equals("csv_topic"));
        this.pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
        String format = String.format("CREATE EXTERNAL TABLE kafka_table(\nheaders ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,payload ROW<f_long BIGINT NOT NULL, \nf_int INTEGER NOT NULL, \nf_string VARCHAR NOT NULL \n>) \nTYPE 'kafka' \nLOCATION '%s'\nTBLPROPERTIES '%s'", buildLocation(), this.objectsProvider.getKafkaPropertiesString());
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new KafkaTableProvider()});
        inMemory.executeDdl(format);
        PCollection pCollection = BeamSqlRelUtils.toPCollection(this.pipeline, inMemory.parseQuery("SELECT kafka_table.payload.f_long, kafka_table.payload.f_int, kafka_table.payload.f_string FROM kafka_table"));
        pCollection.apply(ParDo.of(new FakeKvPair())).setCoder(KvCoder.of(StringUtf8Coder.of(), RowCoder.of(TEST_TABLE_SCHEMA))).apply("waitForSuccess", ParDo.of(new StreamAssertEqual(ImmutableSet.of(generateRow(0), generateRow(1), generateRow(2)))));
        pCollection.apply(logRecords(""));
        this.pipeline.run();
        TimeUnit.SECONDS.sleep(4L);
        produceSomeRecords(3);
        for (int i = 0; i < 200; i++) {
            if (FLAG.getOrDefault(Long.valueOf(this.pipeline.getOptions().getOptionsId()), false).booleanValue()) {
                return;
            }
            TimeUnit.MILLISECONDS.sleep(90L);
        }
        Assert.fail();
    }

    private static MapElements<Row, Void> logRecords(final String str) {
        return MapElements.via(new SimpleFunction<Row, Void>() { // from class: org.apache.beam.sdk.extensions.sql.meta.provider.kafka.KafkaTableProviderIT.1
            public Void apply(Row row) {
                System.out.println(row.getValues() + str);
                return null;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Row generateRow(int i) {
        return Row.withSchema(TEST_TABLE_SCHEMA).addValues(new Object[]{Long.valueOf(i), Integer.valueOf((i % 3) + 1), "value" + i}).build();
    }

    private void produceSomeRecords(int i) {
        KafkaProducer kafkaProducer = new KafkaProducer(producerProps());
        Stream.iterate(0, num -> {
            return Integer.valueOf(num.intValue() + 1);
        }).limit(i).forEach(num2 -> {
            kafkaProducer.send(this.objectsProvider.generateProducerRecord(num2.intValue()));
        });
        kafkaProducer.flush();
        kafkaProducer.close();
    }

    private void produceSomeRecordsWithDelay(int i, int i2) {
        KafkaProducer kafkaProducer = new KafkaProducer(producerProps());
        Stream.iterate(0, num -> {
            return Integer.valueOf(num.intValue() + 1);
        }).limit(i).forEach(num2 -> {
            kafkaProducer.send(this.objectsProvider.generateProducerRecord(num2.intValue()));
            try {
                TimeUnit.MILLISECONDS.sleep(i2);
            } catch (InterruptedException e) {
                throw new RuntimeException("Could not wait for producing", e);
            }
        });
        kafkaProducer.flush();
        kafkaProducer.close();
    }

    private Properties producerProps() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaOptions.getKafkaBootstrapServerAddress());
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("buffer.memory", 33554432);
        properties.put("acks", "all");
        properties.put("request.required.acks", "1");
        properties.put("retries", 0);
        properties.put("linger.ms", 1);
        return properties;
    }
}
