File: //opt/sharedrads/python/send_customer_str/check_today_cp
#!/usr/lib/rads/venv/bin/python3
"""Prints a CPU Usage Report in HTML"""
__author__ = 'chases'
import gzip
from datetime import date
from collections import defaultdict
import subprocess
import time
import sys
from pathlib import Path
import argparse
from prettytable import PrettyTable
import rads
ONE_DAY = 86400
def parse_args() -> str:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'-u',
'--user',
action='store',
help='Define your User',
required=True,
)
args = parser.parse_args()
if not rads.is_cpuser(args.user):
sys.exit("Not a valid user.")
return args.user
def parse_hourly_averages(
cpuser: str, today: str
) -> tuple[dict[str, float | None], float]:
now = time.time()
gz_files: list[Path] = []
for entry in Path('/var/log/sa/rads', today).iterdir():
if not entry.name.endswith('avg.gz'):
continue
if entry.name in ('99-avg.gz', '00-avg.gz'):
continue
if now - entry.stat().st_mtime <= ONE_DAY:
gz_files.append(entry)
gz_files.sort(key=lambda x: x.name)
total_cp = 0.0
hour_vs_cp = {}
prev_hourly_cp = 0.0
for gz_file in gz_files:
hour = gz_file.name.split('-')[0]
hour_vs_cp[hour] = None
# userna5 8389 0.10% 180.12re 0.03% 96.80cp 0.59% 0avio 20654k
for line in gzip.open(gz_file):
line = str(line, 'ascii', 'ignore')
if line.startswith(f'{cpuser} '):
# get cp usage for this hour (including hours prior to it)
hourly_cp = float(line.split()[5].replace("cp", ''))
# calc cp usage for only this hour
this_hour_cp = hourly_cp - prev_hourly_cp
total_cp += this_hour_cp
hour_vs_cp[hour] = this_hour_cp
prev_hourly_cp = hourly_cp
break
return hour_vs_cp, total_cp
def get_biggest_process(
hour_vs_cp: dict[str, float | None], cpuser: str, today: str
) -> tuple[dict[str, str], dict[str, str]]:
log_dir = Path('/var/log/sa/rads', today)
top_one = {}
top_summ = {}
for hour, cp_usage in hour_vs_cp.items():
if not cp_usage:
continue
highest_cp = 0.0
highest_one = ''
procs = defaultdict(float)
with subprocess.Popen(
['zgrep', f'^{cpuser} ', str(log_dir / f"{hour}.gz")],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
encoding='ascii',
) as proc:
for line in proc.stdout:
line: str
if '*' in line:
continue
# 'userna5 1.18 cpu 17904k mem 0 io php-cgi \n'
cols = line.split()
name = ' '.join(cols[7:])
cpu = float(cols[1])
# if cpu comes out as 0.00 because it exited so fast, give it
# 0.005s worth of cpu so it registers
procs[name] += cpu or 0.005
if cpu > highest_cp:
highest_cp = cpu
highest_one = name
top_one[hour] = highest_one
if procs:
top_summ[hour] = sorted(procs.items(), key=lambda x: x[1])[-1][0]
else:
top_summ[hour] = ''
return top_one, top_summ
def sa_cm(cpuser: str) -> float | None:
with subprocess.Popen(
['/usr/sbin/sa', '-cm'],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
encoding='ascii',
) as proc:
for line in proc.stdout:
if line.startswith(f'{cpuser} '):
# 'userna5 1 0.00% 0.00re 0.00% 0.00cp 0.00% 0avio 1092k'
return float(line.split()[5].replace('cp', ''))
return None
def print_pretty(
hour_cp: dict[str, float | None],
daily_total: float,
top_one: dict[str, str],
top_summ: dict[str, str],
cpuser: str,
):
tbl = PrettyTable(
["Time", "Percent Usage", "Most CPU Used", "Longest-Running Process"]
)
usage_today = sa_cm(cpuser)
for hour in sorted(hour_cp):
span = f'{int(hour) - 3}:00 - {hour}:00'
if hour_cp[hour]:
percent = f"{hour_cp[hour]/usage_today*100:.2f}"
tbl.add_row([span, percent, top_summ[hour], top_one[hour]])
else:
tbl.add_row([span, 'None', 'None', 'None'])
if usage_today - daily_total > 0:
recent_usage = f"{(usage_today - daily_total) / usage_today * 100:.2f}"
tbl.add_row(['Recent', recent_usage, '', ''])
else:
tbl.add_row(['Recent', 'None', 'None', 'Unable to determine'])
print(tbl.get_html_string(attributes={"class": "proc_body"}))
def main():
"""Prints a CPU Usage Report in HTML"""
today = date.today().strftime('%b/%d')
cpuser = parse_args()
hourly_cp, daily_total = parse_hourly_averages(cpuser, today)
top_one, top_summ = get_biggest_process(hourly_cp, cpuser, today)
print_pretty(hourly_cp, daily_total, top_one, top_summ, cpuser)
if __name__ == "__main__":
main()