Key Takeaways
subprocess.runis the foundation: Every CLI tool you already know (docker,git,systemctl,rsync) becomes scriptable viasubprocess.run. It’s Python’s bridge to the OS.httpxreplacedrequests: httpx is async-native, ships with HTTP/2 support, and has a nearly identical API to requests. Use it for all HTTP in new Python 3.12+ code.pathlib.Pathfor all file operations:Path('/var/log').glob('*.log')is cleaner and safer thanos.listdir. Stop usingos.path.join— it’s been obsolete since Python 3.4.richfor DevOps output: Pretty-print tables, progress bars, live dashboards, and coloured logs in terminal output with zero effort. Transforms scripts fromprint()noise into professional tools.
Introduction
Direct Answer: How do I use Python for DevOps automation and scripting in 2026?
Install the essential DevOps libraries with pip install httpx fabric rich pydantic typer. For running shell commands, use subprocess.run(['docker', 'ps'], capture_output=True, text=True, check=True) — this runs any CLI tool, captures stdout/stderr, and raises CalledProcessError on failure. For HTTP API calls, use httpx.get('http://localhost:11434/api/version').json() for sync or async with httpx.AsyncClient() as client: await client.get(url) for async. For SSH automation on remote servers, use Fabric: conn = Connection('user@server.example.com'); result = conn.run('systemctl status nginx', hide=True). For file handling, use pathlib.Path('/etc/nginx').read_text() and Path('output').write_text(content). Python 3.12 on Ubuntu 24.04 ships with all standard library tools — only the third-party libraries need installation via pip.
“The best shell script is a Python script. Shell scripts get brittle past 50 lines. Python gives you data structures, error handling, testing, and the entire PyPI ecosystem — with exactly the same access to system commands via subprocess.”
This guide covers the six patterns that cover 90% of DevOps automation work: running commands, processing files, calling APIs, SSH into servers, monitoring systems, and generating reports.
Setup
# Create a dedicated DevOps scripts directory
mkdir -p ~/devops-scripts
cd ~/devops-scripts
# Install the core DevOps libraries
pip install httpx fabric rich pydantic typer --break-system-packages
# Verify Python version
python3 --version
Expected output:
Python 3.12.3
Part 1: Running Shell Commands with subprocess
subprocess.run is the correct way to run shell commands in Python 3.12. The os.system() and os.popen() patterns are obsolete.
# subprocess_patterns.py
import subprocess
import json
from pathlib import Path
# ── Pattern 1: Run command, check it succeeded, get output ────────────────
def run_command(cmd: list[str]) -> str:
"""Run a command and return stdout. Raises on non-zero exit."""
result = subprocess.run(
cmd,
capture_output=True, # Capture stdout and stderr
text=True, # Return strings not bytes
check=True # Raise CalledProcessError on non-zero exit
)
return result.stdout.strip()
print(run_command(["docker", "--version"]))
Expected output:
Docker version 27.3.1, build ce12230
# ── Pattern 2: Handle errors gracefully ──────────────────────────────────
def run_safe(cmd: list[str]) -> tuple[bool, str, str]:
"""Run command, return (success, stdout, stderr)."""
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode == 0, result.stdout.strip(), result.stderr.strip()
ok, out, err = run_safe(["systemctl", "is-active", "nginx"])
print(f"Nginx active: {ok}, status: {out}")
ok, out, err = run_safe(["systemctl", "is-active", "nonexistent-service"])
print(f"Fake service active: {ok}, status: {out}")
Expected output:
Nginx active: True, status: active
Fake service active: False, status: inactive
# ── Pattern 3: Parse JSON output from CLI tools ───────────────────────────
def docker_ps() -> list[dict]:
"""Return running Docker containers as a list of dicts."""
raw = run_command(["docker", "ps", "--format", "json"])
# docker ps --format json outputs one JSON object per line
return [json.loads(line) for line in raw.splitlines() if line.strip()]
containers = docker_ps()
for c in containers:
print(f" {c.get('Names', 'unknown'):30s} {c.get('Status', '')}")
Expected output:
prod-stack-nginx-1 Up 2 hours (healthy)
prod-stack-app-1 Up 2 hours (healthy)
prod-stack-db-1 Up 2 hours (healthy)
# ── Pattern 4: Streaming output (long-running commands) ───────────────────
def stream_command(cmd: list[str]) -> int:
"""Run command and print output in real time. Returns exit code."""
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1 # Line-buffered
) as proc:
for line in proc.stdout:
print(line, end="")
return proc.returncode
# Example: stream docker build output
# exit_code = stream_command(["docker", "build", "-t", "myapp", "."])
Part 2: File and Log Processing
# file_processing.py
from pathlib import Path
from datetime import datetime, timedelta
import re
# ── Pattern 1: Find and process log files ────────────────────────────────
log_dir = Path("/var/log/nginx")
if log_dir.exists():
log_files = sorted(log_dir.glob("*.log"))
print(f"Found {len(log_files)} Nginx log files:")
for f in log_files:
size_kb = f.stat().st_size / 1024
print(f" {f.name:40s} {size_kb:.1f} KB")
Expected output:
Found 2 Nginx log files:
access.log 847.3 KB
error.log 12.1 KB
# ── Pattern 2: Parse Nginx access log — extract 5xx errors ───────────────
def parse_nginx_errors(log_path: Path, hours: int = 1) -> list[dict]:
"""Find 5xx errors in Nginx access log from the last N hours."""
cutoff = datetime.now() - timedelta(hours=hours)
errors = []
# Nginx default log format:
# 127.0.0.1 - - [22/Apr/2026:09:00:01 +0000] "GET /api/ HTTP/1.1" 200 1234
pattern = re.compile(
r'(?P<ip>\S+) \S+ \S+ \[(?P<time>[^\]]+)\] "(?P<request>[^"]+)" '
r'(?P<status>\d+) (?P<bytes>\d+)'
)
try:
with log_path.open() as f:
for line in f:
m = pattern.match(line)
if not m:
continue
status = int(m.group("status"))
if status < 500:
continue
try:
ts = datetime.strptime(m.group("time"), "%d/%b/%Y:%H:%M:%S %z")
if ts.replace(tzinfo=None) < cutoff:
continue
except ValueError:
continue
errors.append({
"ip": m.group("ip"),
"time": m.group("time"),
"request": m.group("request"),
"status": status,
})
except FileNotFoundError:
pass
return errors
errors = parse_nginx_errors(Path("/var/log/nginx/access.log"), hours=24)
print(f"5xx errors in last 24h: {len(errors)}")
for e in errors[:5]:
print(f" [{e['time']}] {e['status']} — {e['request'][:60]}")
Expected output:
5xx errors in last 24h: 0
(A healthy server should have zero 5xx errors.)
# ── Pattern 3: Config file management ─────────────────────────────────────
def update_env_value(env_file: Path, key: str, value: str) -> bool:
"""Update a KEY=value in a .env file. Returns True if changed."""
if not env_file.exists():
env_file.write_text(f"{key}={value}\n")
return True
content = env_file.read_text()
pattern = re.compile(rf"^{re.escape(key)}=.*$", re.MULTILINE)
if pattern.search(content):
new_content = pattern.sub(f"{key}={value}", content)
changed = new_content != content
else:
new_content = content.rstrip("\n") + f"\n{key}={value}\n"
changed = True
if changed:
# Write atomically — write to temp file then rename
tmp = env_file.with_suffix(".tmp")
tmp.write_text(new_content)
tmp.replace(env_file)
return changed
env = Path("/tmp/test.env")
env.write_text("APP_ENV=development\nDB_PORT=5432\n")
changed = update_env_value(env, "APP_ENV", "production")
print(f"Changed: {changed}")
print(env.read_text())
Expected output:
Changed: True
APP_ENV=production
DB_PORT=5432
Part 3: REST API Calls with httpx
# api_automation.py
import httpx
import asyncio
from typing import Optional
# ── Pattern 1: Synchronous API call ──────────────────────────────────────
def check_ollama_status() -> dict:
"""Check if local Ollama instance is running and return version info."""
try:
resp = httpx.get("http://localhost:11434/api/version", timeout=3.0)
resp.raise_for_status()
return {"status": "running", "version": resp.json().get("version")}
except httpx.ConnectError:
return {"status": "offline", "version": None}
except httpx.HTTPStatusError as e:
return {"status": "error", "code": e.response.status_code}
result = check_ollama_status()
print(f"Ollama: {result}")
Expected output (if Ollama is running):
Ollama: {'status': 'running', 'version': '0.5.12'}
# ── Pattern 2: Async health checks — check multiple services in parallel ──
async def check_service(name: str, url: str, timeout: float = 3.0) -> dict:
"""Check one HTTP service's health. Returns status dict."""
async with httpx.AsyncClient(timeout=timeout) as client:
try:
resp = await client.get(url)
return {
"name": name,
"status": "up" if resp.status_code < 400 else "degraded",
"code": resp.status_code,
"latency_ms": resp.elapsed.total_seconds() * 1000
}
except (httpx.ConnectError, httpx.TimeoutException):
return {"name": name, "status": "down", "code": None, "latency_ms": None}
async def check_all_services(services: list[tuple[str, str]]) -> list[dict]:
"""Check all services concurrently."""
tasks = [check_service(name, url) for name, url in services]
return await asyncio.gather(*tasks)
# Define your local services
services = [
("Nginx", "http://localhost:80/health"),
("API", "http://localhost:3000/health"),
("Ollama", "http://localhost:11434/api/version"),
("Open WebUI", "http://localhost:3000"),
]
results = asyncio.run(check_all_services(services))
for r in results:
icon = "✓" if r["status"] == "up" else "✗"
latency = f"{r['latency_ms']:.0f}ms" if r["latency_ms"] else "—"
print(f" {icon} {r['name']:20s} {r['status']:8s} {latency}")
Expected output:
✓ Nginx up 12ms
✓ API up 8ms
✓ Ollama up 5ms
✗ Open WebUI down —
# ── Pattern 3: Retry with backoff ─────────────────────────────────────────
import time
def wait_for_service(url: str, max_wait: int = 60, interval: int = 5) -> bool:
"""Poll a URL until it responds OK or timeout. Returns True on success."""
deadline = time.monotonic() + max_wait
attempt = 0
while time.monotonic() < deadline:
attempt += 1
try:
resp = httpx.get(url, timeout=3.0)
if resp.status_code < 400:
print(f" ✓ Service ready after {attempt} attempt(s)")
return True
except (httpx.ConnectError, httpx.TimeoutException):
pass
remaining = deadline - time.monotonic()
if remaining > interval:
print(f" Attempt {attempt}: not ready, retrying in {interval}s...")
time.sleep(interval)
return False
# Wait for PostgreSQL API health endpoint after deploy
# ready = wait_for_service("http://localhost:3000/health", max_wait=120)
Part 4: SSH Automation with Fabric
# fabric_automation.py
# pip install fabric
from fabric import Connection, Config
from pathlib import Path
import io
# ── Pattern 1: Run commands on a remote server ────────────────────────────
def get_server_info(host: str, user: str = "ubuntu") -> dict:
"""Collect basic server info via SSH."""
with Connection(
host=host,
user=user,
connect_kwargs={"key_filename": str(Path.home() / ".ssh" / "id_ed25519")}
) as conn:
uptime = conn.run("uptime -p", hide=True).stdout.strip()
mem = conn.run("free -h | awk 'NR==2{print $2\" total, \"$3\" used\"}'", hide=True).stdout.strip()
disk = conn.run("df -h / | awk 'NR==2{print $4\" free of \"$2}'", hide=True).stdout.strip()
os_ver = conn.run("lsb_release -d | cut -f2", hide=True).stdout.strip()
load = conn.run("cat /proc/loadavg | cut -d' ' -f1-3", hide=True).stdout.strip()
return {
"host": host,
"os": os_ver,
"uptime": uptime,
"memory": mem,
"disk": disk,
"load_avg": load
}
# info = get_server_info("10.0.0.100")
# print(info)
Expected output (when run against a real server):
{
'host': '10.0.0.100',
'os': 'Ubuntu 24.04.2 LTS',
'uptime': 'up 3 days, 4 hours, 12 minutes',
'memory': '7.7Gi total, 2.3Gi used',
'disk': '45G free of 79G',
'load_avg': '0.12 0.08 0.05'
}
# ── Pattern 2: Deploy an application ─────────────────────────────────────
def deploy_app(host: str, user: str, app_dir: str = "/opt/myapp") -> bool:
"""
Deploy latest application version via SSH.
Steps: git pull → docker compose pull → docker compose up -d → health check
"""
with Connection(host, user=user) as conn:
print(f"Deploying to {host}...")
# Pull latest code
result = conn.run(f"cd {app_dir} && git pull origin main", hide=True)
commit = conn.run(
f"cd {app_dir} && git log -1 --format='%h %s'", hide=True
).stdout.strip()
print(f" Code: {commit}")
# Pull new Docker images
conn.run(f"cd {app_dir} && docker compose pull", hide=True)
print(" Images pulled")
# Rolling restart
conn.run(
f"cd {app_dir} && docker compose up -d --no-deps app",
hide=True
)
print(" Containers restarted")
# Wait for health check
import time
for attempt in range(12): # 60 second timeout
result = conn.run(
"docker compose ps --format json | python3 -c \""
"import json,sys; data=[json.loads(l) for l in sys.stdin if l.strip()]; "
"unhealthy=[c['Name'] for c in data if 'unhealthy' in c.get('Health','')]; "
"print('unhealthy:' + ','.join(unhealthy) if unhealthy else 'healthy')\"",
hide=True
).stdout.strip()
if result == "healthy":
print(f" ✓ Deployment successful ({attempt * 5}s)")
return True
time.sleep(5)
print(" ✗ Health check timed out")
return False
# ── Pattern 3: Upload files and run remote commands ───────────────────────
def update_config(host: str, user: str, local_config: Path, remote_path: str) -> None:
"""Upload a config file and reload the affected service."""
with Connection(host, user=user) as conn:
# Upload the file
conn.put(str(local_config), remote=remote_path)
print(f" Uploaded {local_config.name} → {remote_path}")
# Test nginx config before reload
test = conn.sudo("nginx -t", hide=True, warn=True)
if test.exited != 0:
print(f" ✗ Nginx config invalid: {test.stderr}")
return
# Reload without downtime
conn.sudo("systemctl reload nginx", hide=True)
print(" ✓ Nginx reloaded")
Part 5: System Monitoring Script
# system_monitor.py
# A complete server monitoring script with rich terminal output
import subprocess
import json
import shutil
from pathlib import Path
from datetime import datetime
try:
from rich.console import Console
from rich.table import Table
from rich import box
HAS_RICH = True
except ImportError:
HAS_RICH = False
def collect_metrics() -> dict:
"""Collect system metrics using standard Linux tools."""
def run(cmd):
r = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return r.stdout.strip()
# CPU load average
load = run("cat /proc/loadavg").split()[:3]
# Memory
mem_lines = run("free -b").splitlines()
mem_vals = mem_lines[1].split()
mem_total = int(mem_vals[1])
mem_used = int(mem_vals[2])
mem_pct = round(mem_used / mem_total * 100, 1)
# Disk usage for /
disk_line = run("df -B1 / | tail -1").split()
disk_total = int(disk_line[1])
disk_used = int(disk_line[2])
disk_pct = round(disk_used / disk_total * 100, 1)
# Docker containers
containers = []
raw = run("docker ps --format json 2>/dev/null")
if raw:
for line in raw.splitlines():
try:
c = json.loads(line)
containers.append({
"name": c.get("Names", ""),
"image": c.get("Image", ""),
"status": c.get("Status", ""),
"health": "✓" if "healthy" in c.get("Status", "") else
"✗" if "unhealthy" in c.get("Status", "") else "~"
})
except json.JSONDecodeError:
pass
# Key services
services = {}
for svc in ["nginx", "postgresql", "ollama", "docker"]:
r = subprocess.run(
["systemctl", "is-active", svc],
capture_output=True, text=True
)
services[svc] = r.stdout.strip()
return {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"hostname": run("hostname"),
"load": load,
"memory": {"total_gb": mem_total / 1e9, "used_gb": mem_used / 1e9, "pct": mem_pct},
"disk": {"total_gb": disk_total / 1e9, "used_gb": disk_used / 1e9, "pct": disk_pct},
"containers": containers,
"services": services,
}
def print_report(metrics: dict) -> None:
"""Print formatted system report."""
if not HAS_RICH:
# Plain text fallback
print(f"\n=== Server Monitor: {metrics['hostname']} @ {metrics['timestamp']} ===")
m = metrics["memory"]
d = metrics["disk"]
print(f"Memory: {m['used_gb']:.1f}/{m['total_gb']:.1f}GB ({m['pct']}%)")
print(f"Disk: {d['used_gb']:.1f}/{d['total_gb']:.1f}GB ({d['pct']}%)")
print(f"Load: {' '.join(metrics['load'])}")
print("\nServices:", {k: v for k, v in metrics['services'].items()})
return
console = Console()
console.print(f"\n[bold cyan]Server Monitor[/] — [dim]{metrics['hostname']}[/] @ {metrics['timestamp']}")
# System table
table = Table(box=box.SIMPLE, show_header=True)
table.add_column("Metric", style="bold")
table.add_column("Value", justify="right")
table.add_column("Bar", min_width=20)
m = metrics["memory"]
bar_mem = "█" * int(m["pct"] / 5) + "░" * (20 - int(m["pct"] / 5))
color = "red" if m["pct"] > 90 else "yellow" if m["pct"] > 70 else "green"
table.add_row("Memory", f"{m['used_gb']:.1f}/{m['total_gb']:.1f}GB ({m['pct']}%)",
f"[{color}]{bar_mem}[/]")
d = metrics["disk"]
bar_disk = "█" * int(d["pct"] / 5) + "░" * (20 - int(d["pct"] / 5))
color = "red" if d["pct"] > 90 else "yellow" if d["pct"] > 70 else "green"
table.add_row("Disk /", f"{d['used_gb']:.1f}/{d['total_gb']:.1f}GB ({d['pct']}%)",
f"[{color}]{bar_disk}[/]")
table.add_row("Load (1/5/15m)", " / ".join(metrics["load"]), "")
console.print(table)
# Services
svc_table = Table(title="Services", box=box.SIMPLE)
svc_table.add_column("Service")
svc_table.add_column("Status")
for name, status in metrics["services"].items():
color = "green" if status == "active" else "red"
svc_table.add_row(name, f"[{color}]{status}[/]")
console.print(svc_table)
# Containers
if metrics["containers"]:
c_table = Table(title="Docker Containers", box=box.SIMPLE)
c_table.add_column("Name")
c_table.add_column("Image")
c_table.add_column("Status")
c_table.add_column("Health", justify="center")
for c in metrics["containers"]:
c_table.add_row(c["name"], c["image"][:40], c["status"], c["health"])
console.print(c_table)
if __name__ == "__main__":
metrics = collect_metrics()
print_report(metrics)
python3 system_monitor.py
Expected output:
Server Monitor — hetzner-cx22 @ 2026-04-22 10:30:00
Metric Value Bar
─────────────────────────────────────────────
Memory 1.8/3.8GB (47.2%) ████████████░░░░░░░░
Disk / 8.1/38.6GB (21.0%) ████░░░░░░░░░░░░░░░░
Load (1/5/15m) 0.23 / 0.18 / 0.15
Services
─────────────────
nginx active
postgresql active
ollama active
docker active
Docker Containers
──────────────────────────────────────────────────────────────
prod-stack-nginx-1 nginx:1.27-alpine Up 3 hours (healthy) ✓
prod-stack-app-1 prod-stack-app Up 3 hours (healthy) ✓
prod-stack-db-1 postgres:17-alpine Up 3 hours (healthy) ✓
Part 6: Cron-Ready Automation Script
#!/usr/bin/env python3
# backup_and_alert.py
# Cron-friendly: runs silently on success, prints errors for cron to email
import subprocess
import sys
import json
from pathlib import Path
from datetime import datetime
BACKUP_DIR = Path("/opt/backups")
SLACK_WEBHOOK = "" # Optional: set your webhook URL
def backup_postgres(db_name: str) -> Path:
"""Dump PostgreSQL database to compressed file."""
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
date_str = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = BACKUP_DIR / f"{db_name}_{date_str}.pgdump"
result = subprocess.run(
["sudo", "-u", "postgres", "pg_dump",
"--format=custom", "--compress=9",
f"--file={backup_path}", db_name],
capture_output=True, text=True
)
if result.returncode != 0:
raise RuntimeError(f"pg_dump failed: {result.stderr}")
return backup_path
def cleanup_old_backups(days: int = 7) -> int:
"""Remove backup files older than N days. Returns count deleted."""
import time
cutoff = time.time() - (days * 86400)
deleted = 0
for f in BACKUP_DIR.glob("*.pgdump"):
if f.stat().st_mtime < cutoff:
f.unlink()
deleted += 1
return deleted
def send_slack_alert(message: str) -> None:
"""Send a Slack webhook notification."""
if not SLACK_WEBHOOK:
return
import httpx
try:
httpx.post(SLACK_WEBHOOK, json={"text": message}, timeout=5.0)
except Exception:
pass
if __name__ == "__main__":
try:
path = backup_postgres("myapp")
size_mb = path.stat().st_size / 1e6
deleted = cleanup_old_backups(days=7)
# Silent success — cron only emails on stdout/stderr output
# Uncomment for verbose mode:
# print(f"Backup OK: {path.name} ({size_mb:.1f}MB). Deleted {deleted} old backups.")
sys.exit(0)
except Exception as e:
msg = f"BACKUP FAILED on {Path('/etc/hostname').read_text().strip()}: {e}"
print(msg, file=sys.stderr)
send_slack_alert(f"🚨 {msg}")
sys.exit(1)
# Make executable and add to cron
chmod +x ~/devops-scripts/backup_and_alert.py
# Run at 3 AM daily
echo "0 3 * * * root /usr/bin/python3 /root/devops-scripts/backup_and_alert.py" | \
sudo tee /etc/cron.d/python-backup
Troubleshooting
ModuleNotFoundError: No module named 'fabric'
Fix: pip install fabric --break-system-packages (Ubuntu 24.04 uses system Python with externally-managed environments).
subprocess command works in shell but fails in script
Cause: Shell expansion (~, *, $VAR) doesn’t work when cmd is a list.
Fix: Either use shell=True (less safe) or expand manually:
# Wrong: subprocess.run(["ls", "~/logs"])
# Correct:
import os
subprocess.run(["ls", os.path.expanduser("~/logs")])
# Or:
subprocess.run("ls ~/logs", shell=True, capture_output=True, text=True)
PermissionError when writing to /etc/ or /var/
Cause: Script running as non-root user.
Fix: Use subprocess.run(["sudo", "tee", "/etc/nginx/sites-enabled/myapp"]) to write with sudo, or run the script as root for system-level automation tasks.
Conclusion
Python is now your DevOps automation layer: subprocess runs any CLI tool and captures output, httpx handles async health checks across all your services in parallel, Fabric SSHes into remote servers for deployments and config pushes, and rich turns raw monitoring data into a professional dashboard. All of this runs on the same Python 3.12 that ships with Ubuntu 24.04.
Connect this to your CI/CD pipeline with GitHub Actions CI/CD Tutorial to run these scripts automatically on every deployment, or pair with Build a REST API with FastAPI to expose monitoring data via an HTTP endpoint.
People Also Ask
Should I use Python or Bash for DevOps automation?
Use Bash for simple, linear scripts under 20 lines that chain CLI tools together. Use Python for anything more complex: error handling, data parsing, API calls, SSH automation, testing, or scripts that need to be maintained long-term. The inflection point is when you need conditionals, loops over data structures, or error recovery — Bash becomes brittle quickly, while Python’s subprocess module gives you the same CLI access with the full language available.
What is the difference between subprocess.run and subprocess.Popen?
subprocess.run is the high-level function for running a command and waiting for it to finish — use this for 95% of DevOps scripting. subprocess.Popen is the low-level class for more complex scenarios: streaming output in real time while also processing it, bidirectional stdin/stdout communication, or running a process in the background while continuing other work. Always prefer subprocess.run unless you specifically need Popen’s lower-level control.
How do I securely pass passwords to subprocess commands?
Never pass passwords as command-line arguments — they appear in ps aux output. Use environment variables instead:
env = {**os.environ, "PGPASSWORD": db_password}
subprocess.run(["pg_dump", "mydb"], env=env, check=True)
Or use subprocess.run with input= for tools that read passwords from stdin: subprocess.run(["mysql", "-u", "root", "-p"], input=f"{password}\n", text=True).
Further Reading
- Build a REST API with FastAPI — from scripts to a production Python API
- GitHub Actions CI/CD — run Python automation scripts in CI pipelines
- Ubuntu 24.04 LTS Server Setup Checklist — the servers these scripts manage
- SSH Hardening Guide 2026 — secure the SSH connections Fabric uses
Tested on: Ubuntu 24.04 LTS (Hetzner CX22). Python 3.12.3, httpx 0.28.1, fabric 3.2.2, rich 13.9.4. Last verified: April 22, 2026.