package com.cloudera.cdx.extractor.spark;

import com.cloudera.cdx.extractor.ServiceExtractionTask;
import com.cloudera.cdx.extractor.model.TelemetryPublisherCountersMap;
import com.cloudera.cdx.extractor.spark.SparkApplication;
import com.cloudera.cdx.extractor.util.ExtractorUtil;
import com.cloudera.cdx.extractor.util.HdfsFileSystemUtils;
import com.cloudera.cmf.cdhclient.util.ThrottlingLogger;
import com.cloudera.cmf.version.VersionString;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.HashSet;
import java.util.List;
import java.util.SimpleTimeZone;
import java.util.concurrent.TimeUnit;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.transport.http.HTTPConduit;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/cdx/extractor/spark/SparkPoller.class */
public class SparkPoller implements ServiceExtractionTask {
    private static final int DES_WAIT_TIME_IN_SECONDS = 30;
    private static final String IN_PROGRESS_EVENTS = ".in_progress_events";
    private static final String SUB_DIR = "SparkPoller/";
    private final SparkExtractorContext context;
    private static final Logger LOG = LoggerFactory.getLogger(SparkPoller.class);
    private static final Logger THROTTLED_LOG = new ThrottlingLogger(LOG, Duration.standardMinutes(15));
    private static final VersionString MIN_CDH_VERSION_SUPPORTED = VersionString.of("6.0");
    private static TelemetryPublisherCountersMap tpCounters = TelemetryPublisherCountersMap.getInstance();
    private static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'GMT'");

    public SparkPoller(SparkExtractorContext sparkExtractorContext) {
        this.context = (SparkExtractorContext) Preconditions.checkNotNull(sparkExtractorContext);
    }

    private Instant getStartTimeForPoller(SparkPollerState sparkPollerState, Instant instant) {
        return new Instant(Math.min(instant.minus(this.context.getOptions().getShsPollerLookbackPeriod()).getMillis(), Math.max(instant.minus(this.context.getOptions().getShsPollerMaxLookbackPeriod()).getMillis(), sparkPollerState.getLastFinishedTime())));
    }

    /* renamed from: run, reason: merged with bridge method [inline-methods] */
    public SparkPollerState m7run() {
        return runImpl(Instant.now());
    }

    @VisibleForTesting
    SparkPollerState runImpl(Instant instant) {
        List<SparkApplication> newArrayList;
        SparkPollerState loadState = loadState();
        if (loadState == null) {
            loadState = new SparkPollerState();
        }
        try {
            try {
                SparkHistoryClient createShsClient = createShsClient();
                Instant startTimeForPoller = getStartTimeForPoller(loadState, instant);
                LOG.info("Will fetch spark applications which completed after {}", startTimeForPoller.toString());
                if (VersionString.of(this.context.getCdhVersion()).compareTo(MIN_CDH_VERSION_SUPPORTED) >= 0) {
                    newArrayList = createShsClient.getApplicationsByEndTime(SafariSparkPoller.COMPLETED, DATE_FORMATTER.format(startTimeForPoller.toDate()));
                } else if (this.context.isSpark1()) {
                    newArrayList = createShsClient.getApplications(SafariSparkPoller.COMPLETED, DATE_FORMATTER.format(startTimeForPoller.toDate()));
                } else {
                    String str = createShsClient.getSparkVersion().spark;
                    if (!str.startsWith("2.0.") && !str.startsWith("2.1.")) {
                        newArrayList = createShsClient.getApplicationsByEndTime(SafariSparkPoller.COMPLETED, DATE_FORMATTER.format(startTimeForPoller.toDate()));
                    } else if (this.context.getOptions().isSparkBefore2_2Supported()) {
                        newArrayList = createShsClient.getApplications(SafariSparkPoller.COMPLETED, DATE_FORMATTER.format(startTimeForPoller.toDate()));
                    } else {
                        newArrayList = Lists.newArrayList();
                        THROTTLED_LOG.info("Ignoring Spark applications because version {} is below 2.2", str);
                    }
                }
                LOG.debug("There are {} spark applications received from history server for service {}.", Integer.valueOf(newArrayList.size()), this.context.getServiceName());
                HashSet newHashSet = Sets.newHashSet();
                for (SparkApplication sparkApplication : newArrayList) {
                    if (loadState.getLastExportedApps() != null && loadState.getLastExportedApps().contains(sparkApplication.id)) {
                        LOG.debug("Skipping already exported app {}", sparkApplication.id);
                        newHashSet.add(sparkApplication.id);
                    } else if (sparkApplication.attempts == null) {
                        LOG.warn("Received Spark App {} with no attempts", sparkApplication.id);
                    } else {
                        LOG.debug("Received Spark App {} with {} attempts", sparkApplication.id, Integer.valueOf(sparkApplication.attempts.size()));
                        String streamName = this.context.getEventLogExporter().getStreamName();
                        try {
                            exportEventLog(sparkApplication);
                            tpCounters.get(streamName).incrementIngestSuccessCount();
                            newHashSet.add(sparkApplication.id);
                        } catch (Exception e) {
                            tpCounters.get(streamName).incrementIngestFailCount();
                            throw e;
                        }
                    }
                }
                loadState.setLastFinishedTime(instant.minus(this.context.getOptions().getShsAppLoadingBufferPeriod()).getMillis());
                loadState.setLastExportedApps(newHashSet);
                saveState(loadState);
                if (createShsClient != null) {
                    closeClient(createShsClient);
                }
                try {
                    this.context.getEventLogExporter().close(this.context.getOptions().getExporterShutdownSeconds(), TimeUnit.SECONDS);
                } catch (Throwable th) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Error encountered in processing the service {}.", this.context.getServiceName(), th);
                    }
                }
            } catch (Throwable th2) {
                if (0 != 0) {
                    closeClient(null);
                }
                try {
                    this.context.getEventLogExporter().close(this.context.getOptions().getExporterShutdownSeconds(), TimeUnit.SECONDS);
                } catch (Throwable th3) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Error encountered in processing the service {}.", this.context.getServiceName(), th3);
                    }
                }
                throw th2;
            }
        } catch (Throwable th4) {
            LOG.error("Error encountered in processing the spark service {}.", this.context.getServiceName(), th4);
            if (0 != 0) {
                closeClient(null);
            }
            try {
                this.context.getEventLogExporter().close(this.context.getOptions().getExporterShutdownSeconds(), TimeUnit.SECONDS);
            } catch (Throwable th5) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Error encountered in processing the service {}.", this.context.getServiceName(), th5);
                }
            }
        }
        return loadState;
    }

    private SparkPollerState loadState() {
        return (SparkPollerState) this.context.getStateStore().load(SparkPollerState.class, new String[]{this.context.getService().getCdxId()});
    }

    private void saveState(SparkPollerState sparkPollerState) {
        this.context.getStateStore().save(sparkPollerState, new String[]{this.context.getService().getCdxId()});
    }

    @VisibleForTesting
    void exportEventLog(SparkApplication sparkApplication) {
        LOG.debug("Processing spark application {}", sparkApplication.id);
        for (SparkApplication.SparkApplicationAttempt sparkApplicationAttempt : sparkApplication.attempts) {
            String format = sparkApplicationAttempt.attemptId == null ? sparkApplication.id : String.format("%s_%s", sparkApplication.id, sparkApplicationAttempt.attemptId);
            File file = new File(this.context.getOptions().getStorageDirectory(), SUB_DIR + format);
            createApplicationStorageDir(file);
            List matchingFiles = HdfsFileSystemUtils.getMatchingFiles(this.context.getHadoopConf(), this.context.getEventLogDir(), format + "*", this.context.getOptions().getHdfsSuperuser());
            if (matchingFiles.isEmpty()) {
                LOG.warn("Did not find any event logs for {}", format);
            } else {
                if (matchingFiles.size() > 1) {
                    LOG.warn(String.format("Found %d event logs for %s, will use %s", Integer.valueOf(matchingFiles.size()), format, matchingFiles.get(0)));
                }
                File file2 = new File(file, new File((String) matchingFiles.get(0)).getName());
                if (HdfsFileSystemUtils.loadFileLocally(this.context.getHadoopConf(), (String) matchingFiles.get(0), file2.toString(), this.context.getOptions().getHdfsSuperuser(), this.context.getOptions().getMaxAllowedSizeForSparkJobs())) {
                    LOG.debug("Fetched the file {} from {}.", file2.toString(), matchingFiles.get(0));
                    LOG.debug("Fetched the file {} of size {}.", file2.toString(), Long.valueOf(file2.length()));
                    fetchDesEventFileForSparkApplication(file, sparkApplication, format);
                    try {
                        File compressAndDeleteDir = ExtractorUtil.compressAndDeleteDir(file);
                        this.context.getEventLogExporter().send(compressAndDeleteDir);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Generated tar ball {} for spark application {}", compressAndDeleteDir.toString(), sparkApplication.id);
                            LOG.debug(String.format("File %s has size %d.", compressAndDeleteDir.toString(), Long.valueOf(compressAndDeleteDir.length())));
                        }
                        LOG.debug("Sent event logs for app {}", compressAndDeleteDir.getName());
                    } catch (IOException e) {
                        LOG.error("Failed to compress event log directory " + file.toString(), e);
                        throw new RuntimeException(e);
                    }
                } else {
                    LOG.warn("File {} is not fetched for spark service {}", matchingFiles.get(0), this.context.getServiceName());
                }
            }
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Processed spark application {}", sparkApplication.id);
        }
    }

    private void createApplicationStorageDir(File file) {
        if (!file.exists()) {
            if (!file.mkdirs()) {
                throw new RuntimeException("Failed to create " + file.toString());
            }
            return;
        }
        for (File file2 : file.listFiles()) {
            file2.delete();
        }
    }

    private void fetchDesEventFileForSparkApplication(File file, SparkApplication sparkApplication, String str) {
        File file2 = new File(this.context.getEventLogDir(), String.format(".des_events_%s", str));
        DateTime plusSeconds = DateTime.now().plusSeconds(DES_WAIT_TIME_IN_SECONDS);
        List list = null;
        while (DateTime.now().isBefore(plusSeconds)) {
            list = HdfsFileSystemUtils.getMatchingFiles(this.context.getHadoopConf(), this.context.getEventLogDir(), file2.getName() + "*", this.context.getOptions().getHdfsSuperuser());
            if (list != null && !list.isEmpty()) {
                if (!((String) list.get(0)).endsWith(IN_PROGRESS_EVENTS)) {
                    break;
                }
                LOG.info("The des file is not finalized yet. Wait for {} seconds checking for every 1 second.", Integer.valueOf(DES_WAIT_TIME_IN_SECONDS));
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    LOG.warn("Application was interrupted while waiting for DES event file {}.", str, e);
                    throw new RuntimeException(e);
                }
            } else {
                LOG.debug("Did not find any event logs for {}.", str);
                return;
            }
        }
        if (list == null || list.isEmpty() || ((String) list.get(0)).endsWith(IN_PROGRESS_EVENTS)) {
            LOG.info("There are no DES event log files generated for the application {}.", str);
            return;
        }
        LOG.debug("Found the des file {} for the application {}.", list.get(0), str);
        File file3 = new File(file, file2.getName().substring(1));
        if (!HdfsFileSystemUtils.loadFileLocally(this.context.getHadoopConf(), file2.toString(), file3.toString(), this.context.getOptions().getHdfsSuperuser(), this.context.getOptions().getMaxAllowedSizeForSparkJobs())) {
            LOG.warn("Did not fetch the file {} for the service {} as the file is too big.", file3.toString(), this.context.getServiceName());
            LOG.debug("Fetched the file {} has size {}.", file3.toString(), Long.valueOf(file3.length()));
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Fetched the file {} for the service {}.", file3.toString(), this.context.getServiceName());
            LOG.debug("Fetched the file {} has size {}.", file3.toString(), Long.valueOf(file3.length()));
        }
    }

    @VisibleForTesting
    SparkHistoryClient createShsClient() {
        return (SparkHistoryClient) ExtractorUtil.createClient(SparkHistoryClient.class, this.context.getShsUrl(), true, this.context.isSecure(), this.context.isSslEnabled(), this.context.getTrustManagerProvider());
    }

    @VisibleForTesting
    void closeClient(SparkHistoryClient sparkHistoryClient) {
        HTTPConduit httpConduit = WebClient.getConfig(sparkHistoryClient).getHttpConduit();
        if (httpConduit == null) {
            throw new IllegalArgumentException("Client is not using the HTTP transport");
        }
        httpConduit.close();
    }

    static {
        DATE_FORMATTER.setCalendar(Calendar.getInstance(new SimpleTimeZone(0, "GMT")));
    }
}
