File: //opt/sharedrads/rotate_ip_addresses.py
#!/usr/lib/rads/venv/bin/python3
import argparse
import collections
import functools
import logging
import shlex
import os
import re
import shutil
import sys
import textwrap
import multiprocessing
import glob
import subprocess
import netaddr
import pp_api
import rads
runcmd = functools.partial(
subprocess.run, check=True, capture_output=True, encoding='utf-8'
)
description = textwrap.dedent(
"""
This script will change all users of the target IP to use one of the
provided new IP addresses.
Download and read this first: https://imhsc.imhadmin.net/modules/Fi"""
"""les/Setup%20Checklists/?download=MainIPChangeChecklist.ods
Tasks you should already have performed:
- Let support/systems know what you're about to do.
- If the target is the main IP of the box
- You've chosen one from the Shared block.
- You've licensed cpanel/softaculous on the new IP.
- Ensured that this script is being run over the backlan.
- Ensured that this script is being run in a screen.
- You've chosen a pool of dedicated IPs to use as the new shared IPs.
- Chosen IPs are marked in system center.
- A and PTR records are updated for the IPs in question.
Tasks this script will perform:
- Reconfigure networking, including the main IP if specified
- Notify customers of their new IP via power panel
- Configure cPanel for the new IP
- /var/cpanel/mainip
- /etc/wwwacct.conf
- /var/cpanel/mainips/{root,inmotion,tier2s}
- /etc/mailips - only if the target IP is here
- /etc/reservedips
- /etc/reservedipreasons
- If the main IP is the target, change inmotion/tier2s/secure IP
- Change all users on the target IP to one of the new IPs
Tasks you should perform after user rotation begins:
- Revoke the cPanel and Softaculous licenses.
- Start the HUD updating from /root/ip_change_status
- Double-check that the DNS for the secure hostname updated correctly
- Review the checklist and confirm all steps manually
"""
)
def get_users_on(ip):
"""Returns a list of users on ip"""
user_dir = "/var/cpanel/users"
ip_matcher = re.compile("IP={}$".format(str(ip.ip).replace(".", r"\.")))
user_list = []
special_users = {'tier2s', 'inmotion', 'hubhost', rads.SECURE_USER}
for user in os.listdir(user_dir):
if user in special_users:
continue
with open(os.path.join(user_dir, user), encoding='ascii') as f:
for line in f:
if ip_matcher.match(line) is not None:
user_list.append(user)
break
return user_list
def generate_work_list(destinations, user_list: list[str]):
"""Generates a dict of ip -> userlist mappings"""
users = user_list.copy()
mapping = collections.defaultdict(list)
while len(users) > 0:
for dest in destinations:
if len(users) <= 0:
break
mapping[str(dest.ip)].append(users.pop())
return mapping
def strip_quotes(string):
"""Strips quotes and whitespace from a string"""
return string.strip("""'" \n\t""")
def get_main_ip():
"""Gets the current main IP of the server."""
with open("/var/cpanel/mainip", encoding='ascii') as f:
cpanel_main_ip = strip_quotes(f.read())
ifcfg_addr = None
ifcfg_mask = None
with open(
"/etc/sysconfig/network-scripts/ifcfg-eth0", encoding='ascii'
) as f:
for line in f:
if line.startswith("IPADDR="):
ifcfg_addr = strip_quotes(line.split('=')[1])
if line.startswith("NETMASK="):
ifcfg_mask = strip_quotes(line.split('=')[1])
if ifcfg_addr is None or ifcfg_mask is None:
logging.critical(
"Could not read eth0 address information, check IPADDR and NETMASK!"
)
sys.exit(1)
if ifcfg_addr != cpanel_main_ip:
logging.critical(
"cPanel IP (%s) and eth0 IP (%s) do not match, fix this!",
cpanel_main_ip,
ifcfg_addr,
)
sys.exit(1)
logging.info("Determined main IP to be %s", ifcfg_addr)
return netaddr.IPNetwork(f"{ifcfg_addr}/{ifcfg_mask}")
def set_main_ip(ip):
"""Sets the main IP of the server"""
# ifcfg-eth0
new_ifcfg = []
ifcfg_file = "/etc/sysconfig/network-scripts/ifcfg-eth0"
with open(ifcfg_file, encoding='ascii') as f:
for line in f:
if line.startswith("IPADDR="):
new_ifcfg.append(f"IPADDR={ip.ip}\n")
elif line.startswith("NETMASK="):
new_ifcfg.append(f"NETMASK={ip.netmask}\n")
else:
new_ifcfg.append(line)
output = "".join(new_ifcfg)
with open(ifcfg_file, 'w', encoding='ascii') as f:
f.write(output)
logging.info("Wrote new ifcfg-eth0")
# /etc/sysconfig/network
new_network = []
network_file = "/etc/sysconfig/network"
with open(network_file, encoding='ascii') as f:
for line in f:
if line.startswith("GATEWAY="):
new_network.append(f"GATEWAY={ip.network + 1}\n")
else:
new_network.append(line)
output = "".join(new_network)
with open(network_file, 'w', encoding='ascii') as f:
f.write(output)
logging.info("Wrote new network config")
# /var/cpanel/mainip
with open("/var/cpanel/mainip", 'w', encoding='ascii') as f:
f.write(str(ip.ip))
logging.info("Wrote new /var/cpanel/mainip")
# /etc/wwwacct.conf
www_file = "/etc/wwwacct.conf"
new_www = []
with open(www_file, encoding='ascii') as f:
for line in f:
if line.startswith("ADDR "):
new_www.append(f"ADDR {ip.ip}\n")
else:
new_www.append(line)
output = "".join(new_www)
with open(www_file, 'w', encoding='ascii') as f:
f.write(output)
logging.info("Wrote new wwwacct.conf")
try:
runcmd(['ifdown', 'eth0'])
runcmd(['ifup', 'eth0'])
runcmd(['service', 'ipaliases', 'restart'])
except subprocess.CalledProcessError as exc:
logging.critical(
"Failed to configure eth0 when executing %s! STDOUT: %r STDERR: %r",
shlex.join(exc.args),
exc.stdout,
exc.stderr,
)
sys.exit(1)
logging.info("Completed reload of eth0 networking.")
input(
f"Please ping the new main IP {ip} and press ENTER when connectivity "
"has been confirmed. If the IP does not begin to ping within 2 "
"minutes, press Ctrl+C to abort and check main IP configuration."
)
def add_new_ips(destinations):
"""Adds a list of new shared IPs to the server"""
mainips_root = "/var/cpanel/mainips"
if 'hub' in os.uname()[1]:
reseller = 'hubhost'
else:
reseller = 'inmotion'
if not os.path.isdir(mainips_root):
os.mkdir(mainips_root)
with open(os.path.join(mainips_root, "root"), 'a', encoding='ascii') as f:
for ip in destinations:
f.write(f"{ip.ip}\n")
logging.info("Added %s to mainips/root", ip.ip)
shutil.copy(
os.path.join(mainips_root, "root"), os.path.join(mainips_root, "tier2s")
)
logging.info("Copied mainips/root to mainips/tier2s")
shutil.copy(
os.path.join(mainips_root, "root"), os.path.join(mainips_root, reseller)
)
logging.info("Copied mainips/root to mainips/%s", reseller)
with open("/etc/reservedips", 'a', encoding='ascii') as f:
for ip in destinations:
f.write(f"{ip.ip}\n")
logging.info("Added %s to reservedips", ip.ip)
with open("/etc/reservedipreasons", 'a', encoding='ascii') as f:
for ip in destinations:
f.write(f"{ip.ip}=backup mail ip\n")
logging.info("Added %s to reservedipreasons", ip.ip)
with open("/etc/ips", 'a', encoding='ascii') as f:
for ip in destinations:
f.write(f"{ip.ip}:{ip.netmask}:{ip.broadcast}\n")
logging.info("Added %s to /etc/ips", ip)
try:
runcmd(['service', 'ipaliases', 'restart'])
logging.info("Added IPs to the server successfully")
except subprocess.CalledProcessError as exc:
logging.critical(
"Error restarting ipaliases after adding IPs. "
"STDOUT: %s STDERR: %s",
exc.stdout,
exc.stderr,
)
def remove_from_server(ip):
"""Removes an IP from the server (not a main ip)"""
mainips_root = "/var/cpanel/mainips"
if 'hub' in os.uname()[1]:
reseller = 'hubhost'
else:
reseller = 'inmotion'
if os.path.isdir(mainips_root):
new_mainips = []
with open(os.path.join(mainips_root, "root"), encoding='ascii') as f:
for line in f:
if line != f"{ip.ip}\n":
new_mainips.append(line)
with open(
os.path.join(mainips_root, "root"), 'w', encoding='ascii'
) as f:
f.write("".join(new_mainips))
shutil.copy(
os.path.join(mainips_root, "root"),
os.path.join(mainips_root, "tier2s"),
)
logging.info("Copied mainips/root to mainips/tier2s")
shutil.copy(
os.path.join(mainips_root, "root"),
os.path.join(mainips_root, reseller),
)
logging.info("Copied mainips/root to mainips/%s", reseller)
new_reservedips = []
with open("/etc/reservedips", encoding='ascii') as f:
for line in f:
if line != f"{ip.ip}\n":
new_reservedips.append(line)
with open("/etc/reservedips", 'w', encoding='ascii') as f:
f.write("".join(new_reservedips))
new_reasons = []
with open("/etc/reservedipreasons", encoding='ascii') as f:
for line in f:
if not line.startswith(f"{ip.ip}="):
new_reasons.append(line)
with open("/etc/reservedipreasons", 'w', encoding='ascii') as f:
f.write("".join(new_reasons))
new_ips = []
with open("/etc/ips", encoding='ascii') as f:
for line in f:
if not line.startswith(f"{ip.ip}:"):
new_ips.append(line)
with open("/etc/ips", 'w', encoding='ascii') as f:
f.write("".join(new_ips))
def validate_ip(address):
"""Validates an IP and exits on failure."""
try:
ip = netaddr.IPNetwork(address)
except netaddr.core.AddrFormatError:
logging.critical("The specified address '%s' is not valid!", address)
sys.exit(1)
if ip.size <= 2:
logging.critical(
"The prefix of specified address '%s' is too short to be a usable "
"address. You must enter a valid IP with CIDR netmask.",
ip,
)
sys.exit(1)
return ip
def get_args(args=None):
"""Gets the command line arguments or parses args if set."""
parser = argparse.ArgumentParser(
description=description,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
'target_ip',
help="The IP to be changed. Should currently be black-holed. Must be "
"specified in CIDR notation.",
)
parser.add_argument(
'shared_ip',
nargs='+',
help="The shared IPs over which users will be distributed. These must "
"be specified in CIDR notation.",
)
parser.add_argument(
'-m',
'--mainip',
dest='main_ip',
help="The new main IP. If the target is not the main IP, this is "
"optional. Must be specified in CIDR notation.",
)
parser.add_argument(
'--loglevel',
default="INFO",
help="Specify a log level, defaults to INFO.",
)
if args is not None:
return parser.parse_args(args)
return parser.parse_args()
def mail_users(work_dict):
"""Sends mail to each user with their new IP"""
pp = pp_api.PowerPanel()
if 'hub' in os.uname()[1]:
template_id = 539
else:
template_id = 538
data = {'template': template_id, 'cpanelUser': None, 'variable_1': None}
for ip in work_dict:
for user in work_dict[ip]:
data['cpanelUser'] = user
data['variable_1'] = ip
req = pp.call("notification.send", **data)
if req.status != 0:
logging.error(
"Failed to send notification to %s with IP %s because %s.",
user,
ip,
req.message,
)
else:
logging.info("Success: %s notified of IP change %s", user, ip)
def change_user_ips(work_dict):
"""Changes the IP of each user per the work provided"""
total_users = sum(len(work_dict[i]) for i in work_dict)
count = 0.0
setsiteip = ['/usr/local/cpanel/bin/setsiteip', '-u']
for ip in work_dict:
for user in work_dict[ip]:
try:
output = runcmd(setsiteip + [user, ip]).stdout
if len(output) > 5:
logging.warning(
"Possible issue setting %s to %s: %r", user, ip, output
)
else:
logging.info("Success: %s set to %s", user, ip)
except subprocess.CalledProcessError as exc:
logging.error(
"Failed to change IP for %s to %s. STDOUT: %r STDERR: %r",
user,
ip,
exc.stdout,
exc.stderr,
)
count += 1
with open('/root/ip_change_status', 'w', encoding='ascii') as f:
f.write(f"{count / total_users * 100:0.2f}%\n")
def change_main_users(ip):
"""Changes the "main" users to use the new main IP"""
if 'hub' in os.uname()[1]:
reseller = 'hubhost'
else:
reseller = 'inmotion'
change_user_ips({ip: ['tier2s', rads.SECURE_USER, reseller]})
def parse_userdomains():
dom_owners = {}
with open('/etc/userdomains', encoding='ascii') as handle:
for line in handle:
domain, user = line.split(':')
domain = domain.strip()
user = user.strip()
if domain != '*':
dom_owners[domain] = user
return dom_owners
def update_dns(work_dict, target):
"""Manually updates zone files for records that setsiteip
does not touch. (SPF)"""
try:
dnscluster = runcmd(["/scripts/dnscluster"])
except FileNotFoundError:
logging.error("Unable to find cPanel tools!")
return
for zone_file in glob.glob("/var/named/*.db"):
zone_name = zone_file.split("/")[-1].split(".db")[0]
try:
owner = DOM_OWNERS[zone_name]
except KeyError:
logging.error(
'Unable to determine owner of %s from /etc/userdomains',
zone_name,
)
owner = None
new_ip = None
for ip, users in work_dict.items():
for user in users:
if user.rstrip() == owner:
new_ip = ip
break
if new_ip is None:
logging.warning("Unable to determine new IP of %s", zone_name)
continue
lines = []
try:
with open(zone_file, encoding='ascii') as infile:
for line in infile:
line = line.replace(str(target.ip), new_ip)
lines.append(line)
if len(lines) <= 10:
logging.warning(
"Zone file for %s is suspiciously short! Skipping...",
zone_name,
)
continue
with open(zone_file, 'w', encoding='ascii') as outfile:
for line in lines:
outfile.write(line)
except OSError:
logging.error("Unable to read/write %s", zone_file)
continue
dnscluster(synczone=zone_name)
logging.info("Replaced DNS records for %s", zone_name)
def main():
args = get_args()
rads.setup_logging(
path="/var/log/ip_address_rotation.log",
fmt="[%(asctime)s] %(levelname)s: %(message)s",
loglevel=getattr(logging, args.loglevel.upper(), logging.INFO),
print_out=sys.stderr,
)
target = validate_ip(args.target_ip)
logging.info("Validated target IP '%s'", target)
destinations = [validate_ip(addr) for addr in args.shared_ip]
logging.info(
"Validated destination IP(s) '[%s]'", ", ".join(map(str, destinations))
)
current_main_ip = get_main_ip()
if args.main_ip is not None:
main_ip = validate_ip(args.main_ip)
logging.info("Validated main IP '%s'", main_ip)
else:
main_ip = current_main_ip
logging.info("Not changing main IP '%s'", main_ip)
if target.ip == main_ip.ip:
logging.critical(
"The target IP (%s) is the current main IP of the server! You "
"must specify a new main IP!",
target,
)
sys.exit(1)
affected_users = get_users_on(target)
logging.info("Pre-flight checks complete. Confirming...")
print(description)
print("The following changes will be made to the server:")
print(
"- All {} users on {} will be migrated to one of the following:".format(
len(affected_users), target.ip
)
)
for ip in destinations:
print(f" - {ip.ip} with netmask {ip.netmask}")
print(
f"- The main IP of the server will be '{main_ip.ip}',",
f"the netmask will be '{main_ip.netmask}',",
f"and the gateway will be '{main_ip.network}'.",
)
print()
print("The current status percentage will be in /root/ip_change_status")
print()
confirmation = input(
"Please take a moment to thoroughly read and ensure you understand "
"all of the information above. If you have confirmed that this script"
" is about to do what you want, type 'YES' to continue: "
)
if confirmation != 'YES':
logging.critical("User did not enter 'YES' when prompted. Aborting.")
sys.exit(1)
logging.info("User has confirmed all changes. Proceeding.")
work = generate_work_list(destinations, affected_users)
if target.ip == current_main_ip.ip:
set_main_ip(main_ip)
change_main_users(str(main_ip.ip))
else:
remove_from_server(target)
add_new_ips(destinations)
rebuild = ["/scripts/rebuildhttpdconf"]
mailer = multiprocessing.Process(target=mail_users, args=(work,))
mailer.start()
ip_changer = multiprocessing.Process(target=change_user_ips, args=(work,))
ip_changer.start()
dns_updater = multiprocessing.Process(
target=update_dns, args=(work, target)
)
dns_updater.start()
try:
runcmd(rebuild)
except subprocess.CalledProcessError:
logging.error('rebuildhttpdconf failed')
mailer.join()
logging.info("Finished mailing")
ip_changer.join()
logging.info("Finished changing IPs")
dns_updater.join()
logging.info("Finished updating extra DNS records")
try:
runcmd(rebuild)
except subprocess.CalledProcessError:
logging.error('rebuildhttpdconf failed')
try:
runcmd(["/scripts/restartsrv_httpd", "--graceful"])
except subprocess.CalledProcessError:
logging.error('failed to reload httpd')
if __name__ == "__main__":
DOM_OWNERS = parse_userdomains()
main()