File: //opt/sharedrads/blockip
#!/usr/lib/rads/venv/bin/python3
"""wrapper script for blocking/unblocking IPs
on shared via ipset
Also ensures that the IPs are safe to block by
importing the imh-whcheck module from imh-fail2ban"""
import sys
import subprocess
import syslog
import os
import pwd
import sqlite3
import logging
from socket import AddressFamily # pylint:disable=no-name-in-module
from argparse import ArgumentParser
from datetime import datetime, timedelta, timezone
import psutil
from netaddr import IPNetwork
DB_PATH = "/etc/rads/blocked_ips.db"
LOG_PATH = "/var/log/blockip.log"
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.FileHandler(LOG_PATH),
],
)
logger = logging.getLogger('blockip')
def log_action(msg, level='info', console_msg=None):
"""
Log action to both syslog and log file
If console_msg is provided, print that to console
"""
try:
user = os.getlogin()
except Exception:
try:
user = pwd.getpwuid(os.getuid()).pw_name
except Exception:
user = 'unknown'
full_msg = f"[{user}] {msg}"
# Log to syslog
syslog.openlog('blockip')
syslog.syslog(full_msg)
syslog.closelog()
# Log to file
log_func = getattr(logger, level.lower())
log_func(full_msg)
# Only print to console if console_msg is provided
if console_msg:
print(console_msg)
def init_db():
"""Initialize SQLite database and create table if it doesn't exist"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(
'''CREATE TABLE IF NOT EXISTS blocked_ips
(ip TEXT PRIMARY KEY,
reason TEXT,
blocked_at TEXT,
expires_at TEXT)'''
)
conn.commit()
conn.close()
def add_ip_to_db(ip, reason, blocked_at, expires_at):
"""Add IP entry to database"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(
'''INSERT OR REPLACE INTO blocked_ips
(ip, reason, blocked_at, expires_at)
VALUES (?, ?, ?, ?)''',
(ip, reason, blocked_at, expires_at),
)
conn.commit()
conn.close()
def remove_ip_from_db(ip):
"""Remove IP entry from database"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('DELETE FROM blocked_ips WHERE ip = ?', (ip,))
conn.commit()
conn.close()
def get_expired_ips():
"""Get list of expired IPs"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
c.execute('SELECT ip FROM blocked_ips WHERE expires_at <= ?', (now,))
expired_ips = [row[0] for row in c.fetchall()]
conn.close()
return expired_ips
def remove_expired_ips(expired_ips):
"""Remove expired IPs from database"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.executemany(
'DELETE FROM blocked_ips WHERE ip = ?', [(ip,) for ip in expired_ips]
)
conn.commit()
conn.close()
def parse_cli():
"""
Parse CLI arguments
"""
parser = ArgumentParser(description="ipset wrapper for IMH shared firewall")
mgroup = parser.add_mutually_exclusive_group(required=True)
mgroup.add_argument(
'--block',
'-d',
action='store',
nargs=2,
metavar=("IP/CIDR", "REASON"),
help="IP Address or network to block + reason/comment",
)
mgroup.add_argument(
'--unblock',
'-u',
action='store',
metavar="IP/CIDR",
help="IP Address or network to unblock",
)
parser.add_argument(
'--time',
'-t',
type=str,
help="Required block duration (e.g., 7d, 4h, 30m)",
)
mgroup.add_argument(
'--clean',
action='store_true',
help="Unblock IPs whose expiration time has passed (used by cron)",
)
parser.add_argument(
"--skip-relayhosts",
"-s",
action="store_true",
help="skip checking /etc/relayhosts when blocking an IP",
)
return parser.parse_args()
def parse_time_duration(duration):
"""Parse time duration string into timedelta"""
units = {'m': 'minutes', 'h': 'hours', 'd': 'days'}
unit = duration[-1]
value = int(duration[:-1])
if unit not in units:
raise ValueError("Invalid time unit. Use m/h/d.")
return timedelta(**{units[unit]: value})
def check_ignore(ip):
"""
Check if @ip is safe block, or should be ignored
This uses the imh-whcheck script from imh-fail2ban
returns True if we should ignore, otherwise False
"""
cmd = ['/etc/fail2ban/filter.d/ignorecommands/imh-whcheck', ip]
try:
return subprocess.call(cmd) == 0
except OSError as exc:
print(exc, file=sys.stderr)
return False
def check_relayhosts(addr: str):
"""Check /etc/relayhosts for matches"""
try:
with open('/etc/relayhosts', encoding='utf-8') as f:
for tline in f.readlines():
tline = tline.strip()
if addr in IPNetwork(tline):
return True
except OSError as exc:
print(exc, file=sys.stderr)
return False
return False
def check_local(ip):
"""
Check if @ip is local
"""
local_ips = [
IPNetwork(x[0].address)
for x in list(psutil.net_if_addrs().values())
if x[0].family is AddressFamily.AF_INET
]
return ip in local_ips
def write_syslog(msg):
"""
Track actions via syslog and logging servers
"""
syslog.openlog('blockip')
syslog.syslog(msg)
syslog.closelog()
def ipset(cmd: str, ip, setname='blacklist', comment=''):
"""
Run ipset with specified @cmd for @ip on @setname
"""
try:
ret = subprocess.run(
['/sbin/ipset', cmd, setname, str(ip)],
encoding='utf-8',
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL,
check=False,
)
except Exception as e:
error_msg = f"ERROR: Failed to execute ipset: {str(e)}"
log_action(error_msg, 'error')
return
if ret.returncode == 0:
log_msg = f"{setname} {cmd} {ip}"
if comment:
log_msg += f" - {comment}"
log_action(log_msg)
else:
error_msg = f"FAILED: {setname} {cmd} {ip}: {ret.stderr}"
log_action(error_msg, 'error')
def unblock_expired_ips():
"""Unblock IPs whose expiration time has passed"""
expired_ips = get_expired_ips()
for ip in expired_ips:
ipset('del', ip, comment='Cleaned Expired IP')
log_msg = f"Cleaned Expired IP {ip}"
log_action(log_msg)
print(f"Unblocked IP {ip} (expired)")
remove_expired_ips(expired_ips)
def _main():
"""Entry point"""
try:
# Ensure log directory exists
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
# Initialize database
init_db()
args = parse_cli()
if args.block and not args.time:
error_msg = "ERROR: --time is required when using --block"
log_action(error_msg, 'error', console_msg=error_msg)
sys.exit(3)
if args.clean:
log_action("Starting cleanup of expired IPs")
unblock_expired_ips()
log_action("Finished cleanup of expired IPs")
return # Exit after cron task finishes
ip_list = args.block if args.block else [args.unblock]
try:
tip = IPNetwork(ip_list[0])
except Exception as e:
error_msg = f"ERROR: {str(e)}"
log_action(error_msg, 'error')
sys.exit(1)
if tip.size > 1024:
error_msg = "ERROR: Using networks larger than /22 is not allowed!"
log_action(error_msg, 'error', console_msg=error_msg)
sys.exit(2)
elif check_local(str(tip.ip)):
error_msg = f"ERROR: Unable to block local IP {tip}"
log_action(error_msg, 'error', console_msg=error_msg)
sys.exit(2)
elif check_ignore(str(tip.ip)):
error_msg = (
f"ERROR: Unable to block IP {tip}, found in fail2ban whitelist"
)
log_action(error_msg, 'error', console_msg=error_msg)
sys.exit(2)
elif not args.skip_relayhosts and check_relayhosts(str(tip.ip)):
error_msg = (
f"ERROR: Unable to block IP {tip}, found in /etc/relayhosts. "
"Add -s/--skip-relayhosts to skip this check."
)
log_action(error_msg, 'error', console_msg=error_msg)
sys.exit(2)
if args.block:
comment = ip_list[1]
expiry_msg = ""
blocked_at = datetime.now(timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
if args.time:
duration = parse_time_duration(args.time)
expires_at = datetime.now(timezone.utc) + duration
expiry_iso = expires_at.strftime("%Y-%m-%dT%H:%M:%SZ")
expiry_msg = f" [Block expires at {expiry_iso}]"
add_ip_to_db(str(tip), comment, blocked_at, expiry_iso)
log_action(
f"Added IP {tip} to database with expiry {expiry_iso}"
)
ipset('add', tip, comment=comment)
log_action(
f"Blocked IP {tip} ({comment}){expiry_msg}",
console_msg=f"SUCCESS: Blocked IP {tip} "
f"({comment}){expiry_msg}",
)
elif args.unblock:
ipset('del', tip)
remove_ip_from_db(str(tip))
log_action(
f"Unblocked IP {tip}",
console_msg=f"SUCCESS: Unblocked IP {tip}",
)
except Exception as e:
error_msg = f"CRITICAL ERROR: {str(e)}"
log_action(error_msg, 'critical')
sys.exit(1)
if __name__ == '__main__':
_main()