package org.apache.gobblin.qualitychecker.row;

import com.google.common.base.Strings;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.qualitychecker.row.RowLevelPolicy;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.records.RecordStreamProcessor;
import org.apache.gobblin.records.RecordStreamWithMetadata;
import org.apache.gobblin.stream.ControlMessage;
import org.apache.gobblin.stream.FlushControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.FinalState;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.class */
public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState, RecordStreamProcessor<S, S, D, D>, SpeculativeAttemptAwareConstruct {
    private static final Logger log = LoggerFactory.getLogger(RowLevelPolicyChecker.class);
    private final List<RowLevelPolicy> list;
    private final String stateId;
    private final FileSystem fs;
    private boolean errFileOpen;
    private final FrontLoadedSampler sampler;
    private RowLevelErrFileWriter writer;
    private final RowLevelPolicyCheckResults results;
    private boolean allowSpeculativeExecWhenWriteErrFile;
    static final String ALLOW_SPECULATIVE_EXECUTION_WITH_ERR_FILE_POLICY = "allowSpeculativeExecutionWithErrFilePolicy";

    /* JADX INFO: Access modifiers changed from: package-private */
    @ThreadSafe
    /* loaded from: input_file:org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker$FrontLoadedSampler.class */
    public static class FrontLoadedSampler {
        private final long targetRecordsAccepted;
        private final double decayFactor;
        private final AtomicLong errorRecords = new AtomicLong();
        private final AtomicLong nextErrorRecordWritten = new AtomicLong();

        public FrontLoadedSampler(long j, double d) {
            this.targetRecordsAccepted = j;
            this.decayFactor = Math.max(1.0d, d);
            if (this.targetRecordsAccepted <= 0) {
                this.nextErrorRecordWritten.set(Long.MAX_VALUE);
            }
        }

        public boolean acceptNext() {
            long andIncrement = this.errorRecords.getAndIncrement();
            while (andIncrement >= this.nextErrorRecordWritten.get()) {
                if (this.nextErrorRecordWritten.compareAndSet(andIncrement, computeNextErrorRecordWritten())) {
                    return true;
                }
            }
            return false;
        }

        private long computeNextErrorRecordWritten() {
            long j = this.nextErrorRecordWritten.get();
            return j < this.targetRecordsAccepted ? j + 1 : ((long) (this.decayFactor * j)) + 1;
        }
    }

    @Override // org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct
    public boolean isSpeculativeAttemptSafe() {
        return this.list.stream().noneMatch(rowLevelPolicy -> {
            return rowLevelPolicy.getType().equals(RowLevelPolicy.Type.ERR_FILE);
        }) || this.allowSpeculativeExecWhenWriteErrFile;
    }

    public RowLevelPolicyChecker(List<RowLevelPolicy> list, String str, FileSystem fileSystem) {
        this(list, str, fileSystem, new State());
    }

    public RowLevelPolicyChecker(List<RowLevelPolicy> list, String str, FileSystem fileSystem, State state) {
        this.list = list;
        this.stateId = str;
        this.fs = fileSystem;
        this.errFileOpen = false;
        this.results = new RowLevelPolicyCheckResults();
        this.sampler = new FrontLoadedSampler(state.getPropAsLong("qualitychecker.row.errFile.recordsPerTask", 1000000L), 1.5d);
        this.allowSpeculativeExecWhenWriteErrFile = state.getPropAsBoolean(ALLOW_SPECULATIVE_EXECUTION_WITH_ERR_FILE_POLICY, true);
    }

    public boolean executePolicies(Object obj, RowLevelPolicyCheckResults rowLevelPolicyCheckResults) throws IOException {
        for (RowLevelPolicy rowLevelPolicy : this.list) {
            RowLevelPolicy.Result executePolicy = rowLevelPolicy.executePolicy(obj);
            rowLevelPolicyCheckResults.put(rowLevelPolicy, executePolicy);
            if (!checkResult(executePolicy, rowLevelPolicy, obj)) {
                return false;
            }
        }
        return true;
    }

    protected boolean checkResult(RowLevelPolicy.Result result, RowLevelPolicy rowLevelPolicy, Object obj) throws IOException {
        boolean z = true;
        if (result.equals(RowLevelPolicy.Result.FAILED)) {
            if (rowLevelPolicy.getType().equals(RowLevelPolicy.Type.FAIL)) {
                throw new RuntimeException("RowLevelPolicy " + rowLevelPolicy + " failed on record " + obj);
            }
            if (rowLevelPolicy.getType().equals(RowLevelPolicy.Type.ERR_FILE) && this.sampler.acceptNext()) {
                if (this.errFileOpen) {
                    this.writer.write(obj);
                } else {
                    this.writer = new RowLevelErrFileWriter(this.fs);
                    this.writer.open(getErrFilePath(rowLevelPolicy));
                    this.writer.write(obj);
                }
                this.errFileOpen = true;
            }
            z = false;
        }
        return z;
    }

    Path getErrFilePath(RowLevelPolicy rowLevelPolicy) {
        String sanitizePath = HadoopUtils.sanitizePath(rowLevelPolicy.toString(), "-");
        if (!Strings.isNullOrEmpty(this.stateId)) {
            sanitizePath = sanitizePath + "-" + this.stateId;
        }
        if (this.allowSpeculativeExecWhenWriteErrFile) {
            sanitizePath = sanitizePath + "-" + System.currentTimeMillis();
        }
        return new Path(rowLevelPolicy.getErrFileLocation(), sanitizePath + ".err");
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.errFileOpen) {
            this.writer.close();
            this.errFileOpen = false;
        }
    }

    public State getFinalState() {
        State state = new State();
        Iterator<RowLevelPolicy> it = this.list.iterator();
        while (it.hasNext()) {
            state.addAll(it.next().getFinalState());
        }
        return state;
    }

    public RecordStreamWithMetadata<D, S> processStream(RecordStreamWithMetadata<D, S> recordStreamWithMetadata, WorkUnitState workUnitState) {
        return recordStreamWithMetadata.withRecordStream(recordStreamWithMetadata.getRecordStream().filter(streamEntity -> {
            if (streamEntity instanceof ControlMessage) {
                getMessageHandler().handleMessage((ControlMessage) streamEntity);
                return true;
            }
            if (!(streamEntity instanceof RecordEnvelope)) {
                return true;
            }
            boolean executePolicies = executePolicies(((RecordEnvelope) streamEntity).getRecord(), this.results);
            if (!executePolicies) {
                streamEntity.ack();
            }
            return executePolicies;
        }).doFinally(this::close));
    }

    protected ControlMessageHandler getMessageHandler() {
        return new ControlMessageHandler() { // from class: org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker.1
            public void handleMessage(ControlMessage controlMessage) {
                if (controlMessage instanceof FlushControlMessage) {
                    try {
                        RowLevelPolicyChecker.this.close();
                    } catch (IOException e) {
                        RowLevelPolicyChecker.log.error("Failed to close errFile", e);
                    }
                }
            }
        };
    }

    public List<RowLevelPolicy> getList() {
        return this.list;
    }

    public RowLevelPolicyCheckResults getResults() {
        return this.results;
    }
}
