package com.cloudera.cdx.extractor.yarn;

import com.cloudera.cdx.extractor.model.TelemetryPublisherCountersMap;
import com.cloudera.cdx.extractor.model.YarnJob;
import com.cloudera.cdx.extractor.util.ExtractorUtil;
import com.cloudera.cdx.extractor.util.HdfsFileSystemUtils;
import com.cloudera.cdx.extractor.util.ValidationException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.Validation;
import javax.validation.Validator;
import javax.ws.rs.NotFoundException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/cdx/extractor/yarn/YarnJobExtractorRunnable.class */
public class YarnJobExtractorRunnable {
    private static final Logger LOG = LoggerFactory.getLogger(YarnJobExtractorRunnable.class);
    private static final int MAX_PROP_VALUE_LEN = 1000;
    private static final String SUB_DIR = "YarnPoller/";
    public static final int NO_FILE_SIZE_CONSTRAINT = -1;
    private final TelemetryPublisherCountersMap tpCounters;
    private final YarnHistoryClient yarnHistoryClient;
    private final ExtractionStatus prevExtractionStatus;
    private YarnJob activeMrJob;
    private YarnExtractorContext context;
    protected final Validator validator;
    private final MRReporter mrReporter;

    /* loaded from: input_file:com/cloudera/cdx/extractor/yarn/YarnJobExtractorRunnable$ExtractionStatus.class */
    public static class ExtractionStatus {
        private final String SUCCESS = "success";
        private final String FAILED = "failed";
        private String id;
        private boolean successStatus;
        private long finishTime;
        private long startTime;
        private Throwable exception;

        public void setId(String str) {
            this.id = str;
        }

        public void setSuccessStatus(boolean z) {
            this.successStatus = z;
        }

        public boolean isSuccessStatus() {
            return this.successStatus;
        }

        public void setFinishTime(long j) {
            this.finishTime = j;
        }

        public long getFinishTime() {
            return Long.valueOf(this.finishTime).longValue();
        }

        public String getId() {
            return this.id;
        }

        public void setStartTime(long j) {
            this.startTime = j;
        }

        public long getStartTime() {
            return this.startTime;
        }

        public void setException(Throwable th) {
            this.exception = th;
        }

        public Throwable getException() {
            return this.exception;
        }
    }

    public YarnJobExtractorRunnable(YarnExtractorContext yarnExtractorContext, TelemetryPublisherCountersMap telemetryPublisherCountersMap) {
        this(yarnExtractorContext, telemetryPublisherCountersMap, null, null, null);
    }

    public YarnJobExtractorRunnable(YarnExtractorContext yarnExtractorContext, TelemetryPublisherCountersMap telemetryPublisherCountersMap, YarnHistoryClient yarnHistoryClient, YarnJob yarnJob, ExtractionStatus extractionStatus) {
        this.context = yarnExtractorContext;
        this.mrReporter = yarnExtractorContext.getReporter();
        this.validator = Validation.buildDefaultValidatorFactory().getValidator();
        this.tpCounters = telemetryPublisherCountersMap;
        this.activeMrJob = yarnJob;
        this.yarnHistoryClient = yarnHistoryClient;
        this.prevExtractionStatus = extractionStatus;
    }

    public void runWithYarnJob(YarnHistoryClient yarnHistoryClient, YarnJob yarnJob) {
        LOG.debug("Processing MR Job {}.", yarnJob.getId());
        collectMRJob(this.context, yarnHistoryClient, yarnJob);
        LOG.debug("Processeded MR Job {}.", yarnJob.getId());
    }

    public ExtractionStatus run() {
        ExtractionStatus extractionStatus = new ExtractionStatus();
        extractionStatus.setStartTime(this.activeMrJob.getStartTime());
        extractionStatus.setFinishTime(this.activeMrJob.getFinishTime());
        extractionStatus.setId(this.activeMrJob.getId());
        LOG.debug("Application {} is getting extracted in thread {}", extractionStatus.getId(), Thread.currentThread().getName());
        try {
            Preconditions.checkNotNull(this.activeMrJob);
            Preconditions.checkNotNull(this.yarnHistoryClient);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Processing with active job {}", this.activeMrJob.getId());
            }
            runWithYarnJob(this.yarnHistoryClient, this.activeMrJob);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Completed Running with active job {}", this.activeMrJob.getId());
            }
            extractionStatus.setSuccessStatus(true);
            LOG.debug("Completed fetching the application {}", this.activeMrJob.getId());
        } catch (Throwable th) {
            extractionStatus.setSuccessStatus(false);
            extractionStatus.setException(th);
            LOG.error(String.format("Error encountered in fetching the job  %s with finish time %d for the serice %s.", this.activeMrJob.getId(), Long.valueOf(this.activeMrJob.getFinishTime()), this.context.getServiceName()));
        }
        return extractionStatus;
    }

    private boolean collectMRJob(YarnExtractorContext yarnExtractorContext, YarnHistoryClient yarnHistoryClient, YarnJob yarnJob) {
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Processing yarn application {} for configuration.", yarnJob.getId());
            }
            YarnJobConf jobConf = yarnHistoryClient.getJobConf(yarnJob.getId());
            yarnJob.setJobConf(convertJobConf(jobConf));
            File file = null;
            if (yarnExtractorContext.getJhistConfExporter() != null) {
                try {
                    file = extractJhistTarball(jobConf.path, yarnJob.getId());
                } catch (Exception e) {
                    this.tpCounters.get(yarnExtractorContext.getJhistConfExporter().getStreamName()).incrementIngestFailCount();
                    throw e;
                }
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully processed configuration for yarn application {}.", yarnJob.getId());
            }
            extract(yarnJob, file);
            if (!LOG.isDebugEnabled()) {
                return true;
            }
            LOG.debug("Successfully processed yarn application {}.", yarnJob.getId());
            return true;
        } catch (NotFoundException e2) {
            LOG.warn("Yarn MapReduce Job {} was not found.", yarnJob.getId());
            return false;
        }
    }

    private File extractJhistTarball(String str, String str2) {
        File file = new File(this.context.getOptions().getStorageDirectory(), SUB_DIR + str2);
        if (file.exists()) {
            for (File file2 : file.listFiles()) {
                file2.delete();
            }
        } else if (!file.mkdirs()) {
            throw new RuntimeException("Failed to create " + file.toString());
        }
        File file3 = new File(file, new File(str).getName());
        if (!HdfsFileSystemUtils.loadFileLocally(this.context.getHadoopConf(), str, file3.toString(), this.context.getOptions().getHdfsSuperuser(), -1L)) {
            LOG.warn(String.format("Fetching of the file {} is dropped as it is bigger than the max allowed size {} for the service {}.", file3.toString(), -1L, this.context.getServiceName()));
            return null;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Fetched file {} of size {}.", file3.toString(), Long.valueOf(file3.length()));
        }
        List matchingFiles = HdfsFileSystemUtils.getMatchingFiles(this.context.getHadoopConf(), str.substring(0, str.lastIndexOf("/")), str2 + "*jhist", this.context.getOptions().getHdfsSuperuser());
        if (matchingFiles.size() != 1) {
            throw new RuntimeException("Expected one Jhist file, but found " + matchingFiles.size() + "for jobId:" + str2);
        }
        File file4 = new File(file, new File((String) matchingFiles.get(0)).getName());
        if (!HdfsFileSystemUtils.loadFileLocally(this.context.getHadoopConf(), (String) matchingFiles.get(0), file4.toString(), this.context.getOptions().getHdfsSuperuser(), this.context.getOptions().getMaxAllowedSizeForYarnJobs())) {
            LOG.warn(String.format("Fetching of the file {} is dropped as it is bigger than the max allowed size {} for the service {}.", file4.toString(), -1L, this.context.getServiceName()));
            return null;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Fetched file {} of size {}.", file4.toString(), Long.valueOf(file4.length()));
        }
        try {
            File compressAndDeleteDir = ExtractorUtil.compressAndDeleteDir(file);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Created tar ball {} for application {}", compressAndDeleteDir.toString(), str2);
                LOG.debug("tar ball {} size is {}", compressAndDeleteDir.toString(), Long.valueOf(compressAndDeleteDir.length()));
            }
            return compressAndDeleteDir;
        } catch (IOException e) {
            LOG.error("Failed to compress jhist directory " + file.toString(), e);
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    static Map<String, String> convertJobConf(YarnJobConf yarnJobConf) {
        HashMap newHashMap = Maps.newHashMap();
        if (yarnJobConf.properties != null) {
            for (YarnConfProperty yarnConfProperty : yarnJobConf.properties) {
                newHashMap.put(yarnConfProperty.name, StringUtils.abbreviate(yarnConfProperty.value, MAX_PROP_VALUE_LEN));
            }
        }
        return newHashMap;
    }

    protected void extract(YarnJob yarnJob, File file) {
        String streamName = this.context.getCdxExporter().getStreamName();
        try {
            yarnJob.setCdxId(this.context.getMrIdGenerator().generateJobExecIdentity(this.context.getService(), yarnJob.getId()));
            yarnJob.setTemplateId(this.context.getMrIdGenerator().generateJobIdentity(this.context.getService(), yarnJob.getName(), MRUtils.getConfValue(yarnJob.getJobConf(), MRUtils.MAPPER_CFG), MRUtils.getConfValue(yarnJob.getJobConf(), MRUtils.REDUCER_CFG)));
            yarnJob.setSourceId(this.context.getService().getCdxId());
            yarnJob.setExtractionTime(Instant.now());
            this.mrReporter.addJobExecs(1);
            try {
                validate(yarnJob);
            } catch (ValidationException e) {
                LOG.warn("Invalid YarnJob", e);
            }
            publish(yarnJob);
            this.tpCounters.get(streamName).incrementIngestSuccessCount();
            if (file != null) {
                streamName = this.context.getJhistConfExporter().getStreamName();
                publishJhist(file);
                this.tpCounters.get(streamName).incrementIngestSuccessCount();
            }
        } catch (Exception e2) {
            this.tpCounters.get(streamName).incrementIngestFailCount();
            throw e2;
        }
    }

    private void publishJhist(File file) {
        Preconditions.checkNotNull(file);
        this.context.getJhistConfExporter().send(file);
        LOG.debug("Sent jhist for job {}", file.getName());
    }

    private void publish(YarnJob yarnJob) {
        this.context.getCdxExporter().send(yarnJob);
        LOG.debug("Sent job {}", yarnJob.getId());
    }

    private void validate(YarnJob yarnJob) {
        Set validate = this.validator.validate(yarnJob, new Class[0]);
        if (CollectionUtils.isNotEmpty(validate)) {
            throw new ValidationException(validate);
        }
    }
}
