/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.hdfs.json;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.connect.hdfs.DataWriter;
import io.confluent.connect.hdfs.TestWithMiniDFSCluster;
import io.confluent.connect.hdfs.json.JsonDataFileReader;
import io.confluent.connect.hdfs.json.JsonFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DataWriterJsonTest
extends TestWithMiniDFSCluster {
    private JsonConverter converter;
    protected final ObjectMapper mapper = new ObjectMapper();

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.converter = new JsonConverter();
        this.converter.configure(Collections.singletonMap("schemas.enable", "false"), false);
        this.dataFileReader = new JsonDataFileReader();
        this.extension = ".json";
    }

    @Override
    protected Map<String, String> createProps() {
        Map<String, String> props = super.createProps();
        props.put("format.class", JsonFormat.class.getName());
        return props;
    }

    @Test
    public void testWithSchema() throws Exception {
        DataWriter hdfsWriter = new DataWriter(this.connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        hdfsWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> sinkRecords = this.createSinkRecords(7, 0L, this.context.assignment());
        hdfsWriter.write(sinkRecords);
        hdfsWriter.close();
        hdfsWriter.stop();
        long[] validOffsets = new long[]{0L, 3L, 6L};
        this.verify(sinkRecords, validOffsets, this.context.assignment());
    }

    @Test
    public void testNoSchema() throws Exception {
        DataWriter hdfsWriter = new DataWriter(this.connectorConfig, (SinkTaskContext)this.context, this.avroData);
        this.partitioner = hdfsWriter.getPartitioner();
        hdfsWriter.recover(TOPIC_PARTITION);
        List<SinkRecord> sinkRecords = this.createJsonRecordsWithoutSchema(7 * this.context.assignment().size(), 0L, this.context.assignment());
        hdfsWriter.write(sinkRecords);
        hdfsWriter.close();
        hdfsWriter.stop();
        long[] validOffsets = new long[]{0L, 3L, 6L};
        this.verify(sinkRecords, validOffsets, this.context.assignment());
    }

    protected List<SinkRecord> createJsonRecordsWithoutSchema(int size, long startOffset, Set<TopicPartition> partitions) {
        String key = "key";
        int ibase = 12;
        ArrayList<SinkRecord> sinkRecords = new ArrayList<SinkRecord>();
        long offset = startOffset;
        long total = 0L;
        while (total < (long)size) {
            for (TopicPartition tp : partitions) {
                String record = "{\"schema\":{\"type\":\"struct\",\"fields\":[ {\"type\":\"boolean\",\"optional\":true,\"field\":\"booleanField\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"intField\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"longField\"},{\"type\":\"string\",\"optional\":false,\"field\":\"stringField\"}],\"payload\":{\"booleanField\":\"true\",\"intField\":" + String.valueOf(ibase) + ",\"longField\":" + String.valueOf((long)ibase) + ",\"stringField\":str" + String.valueOf(ibase) + "}}";
                sinkRecords.add(new SinkRecord("test-topic", tp.partition(), null, (Object)key, null, (Object)record, offset));
                if (++total < (long)size) continue;
                break;
            }
            ++offset;
        }
        return sinkRecords;
    }

    @Override
    protected void verifyContents(List<SinkRecord> expectedRecords, int startIndex, Collection<Object> records) {
        for (Object jsonRecord : records) {
            SinkRecord expectedRecord = expectedRecords.get(startIndex++);
            Object expectedValue = expectedRecord.value();
            try {
                if (expectedValue instanceof Struct) {
                    byte[] expectedBytes = this.converter.fromConnectData("test-topic", expectedRecord.valueSchema(), expectedRecord.value());
                    expectedValue = this.mapper.readValue(expectedBytes, Object.class);
                }
                Assert.assertEquals((Object)expectedValue, (Object)jsonRecord);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

