/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaRecordCoder;
import org.apache.beam.sdk.io.kafka.KafkaTimestampType;
import org.apache.beam.sdk.io.kafka.ProducerRecordCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.Rule;
import org.junit.Test;

public abstract class BeamKafkaTableTest {
    @Rule
    public TestPipeline pipeline = TestPipeline.create();

    protected abstract BeamKafkaTable getBeamKafkaTable();

    protected abstract byte[] generateEncodedPayload(int var1) throws Exception;

    protected abstract Row generateRow(int var1);

    @Test
    public void testRecorderDecoder() throws Exception {
        BeamKafkaTable kafkaTable = this.getBeamKafkaTable();
        PCollection result = (PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)this.generateEncodedPayload(1), (Object[])new byte[][]{this.generateEncodedPayload(2)}))).apply((PTransform)MapElements.via((SimpleFunction)new BytesToRecord()))).setCoder((Coder)KafkaRecordCoder.of((Coder)ByteArrayCoder.of(), (Coder)ByteArrayCoder.of())).apply(kafkaTable.getPTransformForInput());
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new Row[]{this.generateRow(1), this.generateRow(2)});
        this.pipeline.run();
    }

    @Test
    public void testRecorderEncoder() {
        BeamKafkaTable kafkaTable = this.getBeamKafkaTable();
        PCollection result = (PCollection)((PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)this.generateRow(1), (Object[])new Row[]{this.generateRow(2)}))).apply(kafkaTable.getPTransformForOutput())).setCoder((Coder)ProducerRecordCoder.of((Coder)ByteArrayCoder.of(), (Coder)ByteArrayCoder.of())).apply((PTransform)MapElements.via((SimpleFunction)new ProducerToRecord()))).setCoder((Coder)KafkaRecordCoder.of((Coder)ByteArrayCoder.of(), (Coder)ByteArrayCoder.of())).apply(kafkaTable.getPTransformForInput());
        PAssert.that((PCollection)result).containsInAnyOrder((Object[])new Row[]{this.generateRow(1), this.generateRow(2)});
        this.pipeline.run();
    }

    static class ProducerToRecord
    extends SimpleFunction<ProducerRecord<byte[], byte[]>, KafkaRecord<byte[], byte[]>> {
        ProducerToRecord() {
        }

        public KafkaRecord<byte[], byte[]> apply(ProducerRecord<byte[], byte[]> record) {
            return new KafkaRecord(record.topic(), record.partition() != null ? record.partition() : 0, 0L, 0L, KafkaTimestampType.LOG_APPEND_TIME, record.headers(), (Object)((byte[])record.key()), (Object)((byte[])record.value()));
        }
    }

    private static class BytesToRecord
    extends SimpleFunction<byte[], KafkaRecord<byte[], byte[]>> {
        private BytesToRecord() {
        }

        public KafkaRecord<byte[], byte[]> apply(byte[] bytes) {
            return new KafkaRecord("abc", 0, 0L, 0L, KafkaTimestampType.LOG_APPEND_TIME, (Headers)new RecordHeaders(), KV.of((Object)new byte[0], (Object)bytes));
        }
    }
}

