/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.hdfs;

import io.confluent.connect.hdfs.DataFileReader;
import io.confluent.connect.hdfs.FileUtils;
import io.confluent.connect.hdfs.HdfsSinkConnectorTestBase;
import io.confluent.connect.hdfs.filter.TopicPartitionCommittedFileFilter;
import io.confluent.connect.hdfs.partitioner.Partitioner;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.SchemaProjector;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;

public class TestWithMiniDFSCluster
extends HdfsSinkConnectorTestBase {
    protected static FileSystem fs;
    protected static MiniDFSCluster cluster;
    protected DataFileReader dataFileReader;
    protected Partitioner partitioner;
    protected String extension;
    protected String zeroPadFormat = "%010d";
    private Map<String, String> localProps = new HashMap<String, String>();

    @Before
    public void setup() throws IOException {
        cluster = TestWithMiniDFSCluster.createDFSCluster();
        fs = cluster.getFileSystem();
    }

    @After
    public void cleanup() throws IOException {
        if (fs != null) {
            fs.close();
        }
        if (cluster != null) {
            cluster.shutdown(true);
        }
    }

    @Override
    protected Map<String, String> createProps() {
        Map<String, String> props = super.createProps();
        this.url = "hdfs://" + cluster.getNameNode().getClientNamenodeAddress();
        this.localProps.put("hdfs.url", this.url);
        this.localProps.put("store.url", this.url);
        props.putAll(this.localProps);
        return props;
    }

    @Override
    public void setUp() throws Exception {
        super.setUp();
    }

    @Override
    @After
    public void tearDown() throws Exception {
        if (cluster.isDataNodeUp() && fs.exists(new Path("/")) && fs.isDirectory(new Path("/"))) {
            for (FileStatus file : fs.listStatus(new Path("/"))) {
                if (file.isDirectory()) {
                    fs.delete(file.getPath(), true);
                    continue;
                }
                fs.delete(file.getPath(), false);
            }
        }
    }

    protected List<SinkRecord> createSinkRecords(int size) {
        return this.createSinkRecords(size, 0L);
    }

    protected List<SinkRecord> createSinkRecords(int size, long startOffset) {
        return this.createSinkRecords(size, startOffset, Collections.singleton(new TopicPartition("test-topic", 12)));
    }

    protected List<SinkRecord> createSinkRecords(int size, long startOffset, Set<TopicPartition> partitions) {
        Schema schema = this.createSchema();
        Struct record = this.createRecord(schema);
        ArrayList<Struct> same = new ArrayList<Struct>();
        for (int i = 0; i < size; ++i) {
            same.add(record);
        }
        return this.createSinkRecords(same, schema, startOffset, partitions);
    }

    protected List<SinkRecord> createSinkRecords(List<Struct> records, Schema schema) {
        return this.createSinkRecords(records, schema, 0L, Collections.singleton(new TopicPartition("test-topic", 12)));
    }

    protected List<SinkRecord> createSinkRecords(List<Struct> records, Schema schema, long startOffset, Set<TopicPartition> partitions) {
        String key = "key";
        ArrayList<SinkRecord> sinkRecords = new ArrayList<SinkRecord>();
        for (TopicPartition tp : partitions) {
            long offset = startOffset;
            for (Struct record : records) {
                sinkRecords.add(new SinkRecord("test-topic", tp.partition(), Schema.STRING_SCHEMA, (Object)key, schema, (Object)record, offset++));
            }
        }
        return sinkRecords;
    }

    protected List<SinkRecord> createSinkRecordsNoVersion(int size, long startOffset) {
        String key = "key";
        Schema schemaNoVersion = SchemaBuilder.struct().name("record").field("boolean", Schema.BOOLEAN_SCHEMA).field("int", Schema.INT32_SCHEMA).field("long", Schema.INT64_SCHEMA).field("float", Schema.FLOAT32_SCHEMA).field("double", Schema.FLOAT64_SCHEMA).build();
        Struct recordNoVersion = new Struct(schemaNoVersion);
        recordNoVersion.put("boolean", (Object)true).put("int", (Object)12).put("long", (Object)12L).put("float", (Object)Float.valueOf(12.2f)).put("double", (Object)12.2);
        ArrayList<SinkRecord> sinkRecords = new ArrayList<SinkRecord>();
        for (long offset = startOffset; offset < startOffset + (long)size; ++offset) {
            sinkRecords.add(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, (Object)key, schemaNoVersion, (Object)recordNoVersion, offset));
        }
        return sinkRecords;
    }

    protected List<SinkRecord> createSinkRecordsWithAlternatingSchemas(int size, long startOffset) {
        String key = "key";
        Schema schema = this.createSchema();
        Struct record = this.createRecord(schema);
        Schema newSchema = this.createNewSchema();
        Struct newRecord = this.createNewRecord(newSchema);
        int limit = size / 2 * 2;
        boolean remainder = size % 2 > 0;
        ArrayList<SinkRecord> sinkRecords = new ArrayList<SinkRecord>();
        for (long offset = startOffset; offset < startOffset + (long)limit; ++offset) {
            sinkRecords.add(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, (Object)key, schema, (Object)record, offset));
            sinkRecords.add(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, (Object)key, newSchema, (Object)newRecord, ++offset));
        }
        if (remainder) {
            sinkRecords.add(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, (Object)key, schema, (Object)record, startOffset + (long)size - 1L));
        }
        return sinkRecords;
    }

    protected List<SinkRecord> createSinkRecordsInterleaved(int size, long startOffset, Set<TopicPartition> partitions) {
        String key = "key";
        Schema schema = this.createSchema();
        Struct record = this.createRecord(schema);
        ArrayList<SinkRecord> sinkRecords = new ArrayList<SinkRecord>();
        long offset = startOffset;
        long total = 0L;
        while (total < (long)size) {
            for (TopicPartition tp : partitions) {
                sinkRecords.add(new SinkRecord("test-topic", tp.partition(), Schema.STRING_SCHEMA, (Object)key, schema, (Object)record, offset));
                if (++total < (long)size) continue;
                break;
            }
            ++offset;
        }
        return sinkRecords;
    }

    protected List<SinkRecord> createSinkRecordsWithTimestamp(List<Struct> records, Schema schema, int startOffset, long startTime, long timeStep) {
        String key = "key";
        ArrayList<SinkRecord> sinkRecords = new ArrayList<SinkRecord>();
        int i = 0;
        int offset = startOffset;
        while (i < records.size()) {
            sinkRecords.add(new SinkRecord("test-topic", 12, Schema.STRING_SCHEMA, (Object)key, schema, (Object)records.get(i), (long)offset, Long.valueOf(startTime + (long)offset * timeStep), TimestampType.CREATE_TIME));
            ++i;
            ++offset;
        }
        return sinkRecords;
    }

    protected String getDirectory() {
        return this.getDirectory("test-topic", 12);
    }

    protected String getDirectory(String topic, int partition) {
        String encodedPartition = "partition=" + String.valueOf(partition);
        return this.partitioner.generatePartitionedPath(topic, encodedPartition);
    }

    protected void verify(List<SinkRecord> sinkRecords, long[] validOffsets) throws IOException {
        this.verify(sinkRecords, validOffsets, Collections.singleton(new TopicPartition("test-topic", 12)), false);
    }

    protected void verify(List<SinkRecord> sinkRecords, long[] validOffsets, Set<TopicPartition> partitions) throws IOException {
        this.verify(sinkRecords, validOffsets, partitions, false);
    }

    protected void verify(List<SinkRecord> sinkRecords, long[] validOffsets, Set<TopicPartition> partitions, boolean skipFileListing) throws IOException {
        if (!skipFileListing) {
            this.verifyFileListing(validOffsets, partitions);
        }
        for (TopicPartition tp : partitions) {
            int j = 0;
            for (int i = 1; i < validOffsets.length; ++i) {
                long startOffset = validOffsets[i - 1];
                long endOffset = validOffsets[i] - 1L;
                String topicsDir = (String)this.topicsDir.get(tp.topic());
                String filename = FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)this.getDirectory(tp.topic(), tp.partition()), (TopicPartition)tp, (long)startOffset, (long)endOffset, (String)this.extension, (String)this.zeroPadFormat);
                Path path = new Path(filename);
                Collection<Object> records = this.dataFileReader.readData(this.connectorConfig.getHadoopConfiguration(), path);
                long size = endOffset - startOffset + 1L;
                Assert.assertEquals((long)size, (long)records.size());
                this.verifyContents(sinkRecords, j, records);
                j = (int)((long)j + size);
            }
        }
    }

    protected List<String> getExpectedFiles(long[] validOffsets, TopicPartition tp) {
        ArrayList<String> expectedFiles = new ArrayList<String>();
        for (int i = 1; i < validOffsets.length; ++i) {
            long startOffset = validOffsets[i - 1];
            long endOffset = validOffsets[i] - 1L;
            String topicsDir = (String)this.topicsDir.get(tp.topic());
            expectedFiles.add(FileUtils.committedFileName((String)this.url, (String)topicsDir, (String)this.getDirectory(tp.topic(), tp.partition()), (TopicPartition)tp, (long)startOffset, (long)endOffset, (String)this.extension, (String)this.zeroPadFormat));
        }
        return expectedFiles;
    }

    protected void verifyFileListing(long[] validOffsets, Set<TopicPartition> partitions) throws IOException {
        for (TopicPartition tp : partitions) {
            this.verifyFileListing(this.getExpectedFiles(validOffsets, tp), tp);
        }
    }

    protected void verifyFileListing(List<String> expectedFiles, TopicPartition tp) throws IOException {
        FileStatus[] statuses = new FileStatus[]{};
        try {
            String topicsDir = (String)this.topicsDir.get(tp.topic());
            statuses = fs.listStatus(new Path(FileUtils.directoryName((String)this.url, (String)topicsDir, (String)this.getDirectory(tp.topic(), tp.partition()))), (PathFilter)new TopicPartitionCommittedFileFilter(tp));
        }
        catch (FileNotFoundException topicsDir) {
            // empty catch block
        }
        ArrayList<String> actualFiles = new ArrayList<String>();
        for (FileStatus status : statuses) {
            actualFiles.add(status.getPath().toString());
        }
        Collections.sort(actualFiles);
        Collections.sort(expectedFiles);
        Assert.assertThat(actualFiles, (Matcher)CoreMatchers.is(expectedFiles));
    }

    protected void verifyContents(List<SinkRecord> expectedRecords, int startIndex, Collection<Object> records) {
        Schema expectedSchema = null;
        for (Object avroRecord : records) {
            if (expectedSchema == null) {
                expectedSchema = expectedRecords.get(startIndex).valueSchema();
            }
            Object expectedValue = SchemaProjector.project((Schema)expectedRecords.get(startIndex).valueSchema(), (Object)expectedRecords.get(startIndex++).value(), (Schema)expectedSchema);
            Assert.assertEquals((Object)this.avroData.fromConnectData(expectedSchema, expectedValue), (Object)avroRecord);
        }
    }

    private static MiniDFSCluster createDFSCluster() throws IOException {
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration()).hosts(new String[]{"localhost", "localhost", "localhost"}).nameNodePort(9001).numDataNodes(3).build();
        cluster.waitActive();
        return cluster;
    }

    protected int getFileSystemCacheSize() throws Exception {
        Field cacheField = FileSystem.class.getDeclaredField("CACHE");
        cacheField.setAccessible(true);
        Object cache = cacheField.get(Object.class);
        Field cacheMapField = cache.getClass().getDeclaredField("map");
        cacheMapField.setAccessible(true);
        Map cacheMap = (Map)cacheMapField.get(cache);
        cacheField.setAccessible(false);
        cacheMapField.setAccessible(false);
        return cacheMap.size();
    }
}

