Source code for saberx.sabercore.triggers.processhandler

"""
.. module:: processhandler
   :synopsis: Module for evaluating process trigger.
"""

import psutil
import subprocess
import re

[docs]class ProcessHandler: """ **Class for performing process trigger operation """
[docs] @staticmethod def get_name_count(regex): """ **Method for getting count of process** This method returns the count of processes whose name matches the given regex Args: regex (string): String containing regex for filtering process names Returns: bool, Integer, String : status, count, error if any """ pattern = re.compile(regex) count = 0 try: # loop over all the processes fetched via psutil for proc in psutil.process_iter(attrs=['name']): proc_name = proc.info['name'] # filter them using the regex if pattern.search(proc_name): count += 1 return True, count, None except Exception: ''' Log error ''' return False, None, "CANNOT_ACCESS_PROCESS_NAMES"
[docs] @staticmethod def get_cmdline_count(regex): """ **Method for getting count of process** This method returns the count of processes whose cmdline matches the given regex Args: regex (string): String containing regex for filtering process cmdline Returns: bool, Integer, String : status, count, error if any """ # command for getting all processes in the system filtered # by the process name command = 'ps aux | grep -E "{}" | grep -v grep | wc -l'.format(regex) proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) output, errors = proc.communicate() proc_exit_code = proc.returncode if proc_exit_code != 0: return False, None, "CANNOT_ACCESS_PROCESSES" output = int(output.strip()) return True, output, None
[docs] @staticmethod def check_name(regex): """ **Method for checking if a process exists by name** This method checks if there is any process whose name matches the given pattern Args: regex (string): String containing regex Returns: bool, Integer, String : status, count, error if any """ response, count, error = ProcessHandler.get_name_count(regex) if error: return False, error if count != 0: return True, None return False, None
[docs] @staticmethod def check_cmdline(regex): """ **Method for checking if a process exists by cmdline text** This method checks if there is any process whose cmdline arg matches the given pattern Args: regex (string): String containing regex Returns: bool, Integer, String : status, count, error if any """ response, count, error = ProcessHandler.get_cmdline_count(regex) if error: return False, error if count != 0: return True, None return False, None
@staticmethod def __operate(current, count, operator): """ **Method to evaluate the required operation** Args: current (Integer) : Current number of processes filtered by the given regex count (Integer) : Desired threshold operator (String) : The operation to be performed Returns: bool: Result of the operattion """ return { '=': lambda current, count: current == count, '<': lambda current, count: current < count, '>': lambda current, count: current > count, '<=': lambda current, count: current <= count, '>=': lambda current, count: current >= count }.get(operator)(current, count)
[docs] @staticmethod def check_name_count(regex, count, operator): """ **Method to get the Number of regex filtered processes and perform the deired operation** Args: regex (string): Regex to filter processes count (string): Threshold operation (string): Desired operation Returns: bool, string : Result, error if any """ response, current, error = ProcessHandler.get_name_count(regex) if error: return False, error operation_result = ProcessHandler.__operate(current, count, operator) return operation_result, None
[docs] @staticmethod def check_cmdline_count(regex, count, operator): """ **Method to get the Number of regex filtered processes by cmd and perform the deired operation** Args: regex (string): Regex to filter processes count (string): Threshold operation (string): Desired operation Returns: bool, string : Result, error if any """ response, current, error = ProcessHandler.get_cmdline_count(regex) if error: return False, error operation_result = ProcessHandler.__operate(current, count, operator) return operation_result, None
if __name__ == "__main__": print (ProcessHandler.check_cmdline_count("/usr/sbin/haproxy", 2, "<="))