Vucense

Python for DevOps and Automation 2026: Scripts, APIs, and Infrastructure

🟡Intermediate

Use Python for DevOps automation in 2026. Covers shell scripting with subprocess, file and log processing, REST API calls, SSH automation with Fabric, system monitoring, and Ansible alternatives.

Sarah Jenkins

Author

Sarah Jenkins

Open-Source Community & Ecosystem Lead

Published

Duration

Reading

18 min

Build

20 min

Python for DevOps and Automation 2026: Scripts, APIs, and Infrastructure
Article Roadmap

Key Takeaways

  • subprocess.run is the foundation: Every CLI tool you already know (docker, git, systemctl, rsync) becomes scriptable via subprocess.run. It’s Python’s bridge to the OS.
  • httpx replaced requests: httpx is async-native, ships with HTTP/2 support, and has a nearly identical API to requests. Use it for all HTTP in new Python 3.12+ code.
  • pathlib.Path for all file operations: Path('/var/log').glob('*.log') is cleaner and safer than os.listdir. Stop using os.path.join — it’s been obsolete since Python 3.4.
  • rich for DevOps output: Pretty-print tables, progress bars, live dashboards, and coloured logs in terminal output with zero effort. Transforms scripts from print() noise into professional tools.

Introduction

Direct Answer: How do I use Python for DevOps automation and scripting in 2026?

Install the essential DevOps libraries with pip install httpx fabric rich pydantic typer. For running shell commands, use subprocess.run(['docker', 'ps'], capture_output=True, text=True, check=True) — this runs any CLI tool, captures stdout/stderr, and raises CalledProcessError on failure. For HTTP API calls, use httpx.get('http://localhost:11434/api/version').json() for sync or async with httpx.AsyncClient() as client: await client.get(url) for async. For SSH automation on remote servers, use Fabric: conn = Connection('user@server.example.com'); result = conn.run('systemctl status nginx', hide=True). For file handling, use pathlib.Path('/etc/nginx').read_text() and Path('output').write_text(content). Python 3.12 on Ubuntu 24.04 ships with all standard library tools — only the third-party libraries need installation via pip.

“The best shell script is a Python script. Shell scripts get brittle past 50 lines. Python gives you data structures, error handling, testing, and the entire PyPI ecosystem — with exactly the same access to system commands via subprocess.”

This guide covers the six patterns that cover 90% of DevOps automation work: running commands, processing files, calling APIs, SSH into servers, monitoring systems, and generating reports.


Setup

# Create a dedicated DevOps scripts directory
mkdir -p ~/devops-scripts
cd ~/devops-scripts

# Install the core DevOps libraries
pip install httpx fabric rich pydantic typer --break-system-packages

# Verify Python version
python3 --version

Expected output:

Python 3.12.3

Part 1: Running Shell Commands with subprocess

subprocess.run is the correct way to run shell commands in Python 3.12. The os.system() and os.popen() patterns are obsolete.

# subprocess_patterns.py
import subprocess
import json
from pathlib import Path

# ── Pattern 1: Run command, check it succeeded, get output ────────────────
def run_command(cmd: list[str]) -> str:
    """Run a command and return stdout. Raises on non-zero exit."""
    result = subprocess.run(
        cmd,
        capture_output=True,   # Capture stdout and stderr
        text=True,             # Return strings not bytes
        check=True             # Raise CalledProcessError on non-zero exit
    )
    return result.stdout.strip()

print(run_command(["docker", "--version"]))

Expected output:

Docker version 27.3.1, build ce12230
# ── Pattern 2: Handle errors gracefully ──────────────────────────────────
def run_safe(cmd: list[str]) -> tuple[bool, str, str]:
    """Run command, return (success, stdout, stderr)."""
    result = subprocess.run(cmd, capture_output=True, text=True)
    return result.returncode == 0, result.stdout.strip(), result.stderr.strip()

ok, out, err = run_safe(["systemctl", "is-active", "nginx"])
print(f"Nginx active: {ok}, status: {out}")

ok, out, err = run_safe(["systemctl", "is-active", "nonexistent-service"])
print(f"Fake service active: {ok}, status: {out}")

Expected output:

Nginx active: True, status: active
Fake service active: False, status: inactive
# ── Pattern 3: Parse JSON output from CLI tools ───────────────────────────
def docker_ps() -> list[dict]:
    """Return running Docker containers as a list of dicts."""
    raw = run_command(["docker", "ps", "--format", "json"])
    # docker ps --format json outputs one JSON object per line
    return [json.loads(line) for line in raw.splitlines() if line.strip()]

containers = docker_ps()
for c in containers:
    print(f"  {c.get('Names', 'unknown'):30s} {c.get('Status', '')}")

Expected output:

  prod-stack-nginx-1             Up 2 hours (healthy)
  prod-stack-app-1               Up 2 hours (healthy)
  prod-stack-db-1                Up 2 hours (healthy)
# ── Pattern 4: Streaming output (long-running commands) ───────────────────
def stream_command(cmd: list[str]) -> int:
    """Run command and print output in real time. Returns exit code."""
    with subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1          # Line-buffered
    ) as proc:
        for line in proc.stdout:
            print(line, end="")
    return proc.returncode

# Example: stream docker build output
# exit_code = stream_command(["docker", "build", "-t", "myapp", "."])

Part 2: File and Log Processing

# file_processing.py
from pathlib import Path
from datetime import datetime, timedelta
import re

# ── Pattern 1: Find and process log files ────────────────────────────────
log_dir = Path("/var/log/nginx")

if log_dir.exists():
    log_files = sorted(log_dir.glob("*.log"))
    print(f"Found {len(log_files)} Nginx log files:")
    for f in log_files:
        size_kb = f.stat().st_size / 1024
        print(f"  {f.name:40s} {size_kb:.1f} KB")

Expected output:

Found 2 Nginx log files:
  access.log                               847.3 KB
  error.log                                12.1 KB
# ── Pattern 2: Parse Nginx access log — extract 5xx errors ───────────────
def parse_nginx_errors(log_path: Path, hours: int = 1) -> list[dict]:
    """Find 5xx errors in Nginx access log from the last N hours."""
    cutoff = datetime.now() - timedelta(hours=hours)
    errors = []

    # Nginx default log format:
    # 127.0.0.1 - - [22/Apr/2026:09:00:01 +0000] "GET /api/ HTTP/1.1" 200 1234
    pattern = re.compile(
        r'(?P<ip>\S+) \S+ \S+ \[(?P<time>[^\]]+)\] "(?P<request>[^"]+)" '
        r'(?P<status>\d+) (?P<bytes>\d+)'
    )

    try:
        with log_path.open() as f:
            for line in f:
                m = pattern.match(line)
                if not m:
                    continue
                status = int(m.group("status"))
                if status < 500:
                    continue
                try:
                    ts = datetime.strptime(m.group("time"), "%d/%b/%Y:%H:%M:%S %z")
                    if ts.replace(tzinfo=None) < cutoff:
                        continue
                except ValueError:
                    continue
                errors.append({
                    "ip": m.group("ip"),
                    "time": m.group("time"),
                    "request": m.group("request"),
                    "status": status,
                })
    except FileNotFoundError:
        pass

    return errors

errors = parse_nginx_errors(Path("/var/log/nginx/access.log"), hours=24)
print(f"5xx errors in last 24h: {len(errors)}")
for e in errors[:5]:
    print(f"  [{e['time']}] {e['status']}{e['request'][:60]}")

Expected output:

5xx errors in last 24h: 0

(A healthy server should have zero 5xx errors.)

# ── Pattern 3: Config file management ─────────────────────────────────────
def update_env_value(env_file: Path, key: str, value: str) -> bool:
    """Update a KEY=value in a .env file. Returns True if changed."""
    if not env_file.exists():
        env_file.write_text(f"{key}={value}\n")
        return True

    content = env_file.read_text()
    pattern = re.compile(rf"^{re.escape(key)}=.*$", re.MULTILINE)

    if pattern.search(content):
        new_content = pattern.sub(f"{key}={value}", content)
        changed = new_content != content
    else:
        new_content = content.rstrip("\n") + f"\n{key}={value}\n"
        changed = True

    if changed:
        # Write atomically — write to temp file then rename
        tmp = env_file.with_suffix(".tmp")
        tmp.write_text(new_content)
        tmp.replace(env_file)

    return changed

env = Path("/tmp/test.env")
env.write_text("APP_ENV=development\nDB_PORT=5432\n")
changed = update_env_value(env, "APP_ENV", "production")
print(f"Changed: {changed}")
print(env.read_text())

Expected output:

Changed: True
APP_ENV=production
DB_PORT=5432

Part 3: REST API Calls with httpx

# api_automation.py
import httpx
import asyncio
from typing import Optional

# ── Pattern 1: Synchronous API call ──────────────────────────────────────
def check_ollama_status() -> dict:
    """Check if local Ollama instance is running and return version info."""
    try:
        resp = httpx.get("http://localhost:11434/api/version", timeout=3.0)
        resp.raise_for_status()
        return {"status": "running", "version": resp.json().get("version")}
    except httpx.ConnectError:
        return {"status": "offline", "version": None}
    except httpx.HTTPStatusError as e:
        return {"status": "error", "code": e.response.status_code}

result = check_ollama_status()
print(f"Ollama: {result}")

Expected output (if Ollama is running):

Ollama: {'status': 'running', 'version': '0.5.12'}
# ── Pattern 2: Async health checks — check multiple services in parallel ──
async def check_service(name: str, url: str, timeout: float = 3.0) -> dict:
    """Check one HTTP service's health. Returns status dict."""
    async with httpx.AsyncClient(timeout=timeout) as client:
        try:
            resp = await client.get(url)
            return {
                "name": name,
                "status": "up" if resp.status_code < 400 else "degraded",
                "code": resp.status_code,
                "latency_ms": resp.elapsed.total_seconds() * 1000
            }
        except (httpx.ConnectError, httpx.TimeoutException):
            return {"name": name, "status": "down", "code": None, "latency_ms": None}

async def check_all_services(services: list[tuple[str, str]]) -> list[dict]:
    """Check all services concurrently."""
    tasks = [check_service(name, url) for name, url in services]
    return await asyncio.gather(*tasks)

# Define your local services
services = [
    ("Nginx",         "http://localhost:80/health"),
    ("API",           "http://localhost:3000/health"),
    ("Ollama",        "http://localhost:11434/api/version"),
    ("Open WebUI",    "http://localhost:3000"),
]

results = asyncio.run(check_all_services(services))
for r in results:
    icon = "✓" if r["status"] == "up" else "✗"
    latency = f"{r['latency_ms']:.0f}ms" if r["latency_ms"] else "—"
    print(f"  {icon} {r['name']:20s} {r['status']:8s} {latency}")

Expected output:

  ✓ Nginx                up       12ms
  ✓ API                  up       8ms
  ✓ Ollama               up       5ms
  ✗ Open WebUI           down     —
# ── Pattern 3: Retry with backoff ─────────────────────────────────────────
import time

def wait_for_service(url: str, max_wait: int = 60, interval: int = 5) -> bool:
    """Poll a URL until it responds OK or timeout. Returns True on success."""
    deadline = time.monotonic() + max_wait
    attempt = 0
    while time.monotonic() < deadline:
        attempt += 1
        try:
            resp = httpx.get(url, timeout=3.0)
            if resp.status_code < 400:
                print(f"  ✓ Service ready after {attempt} attempt(s)")
                return True
        except (httpx.ConnectError, httpx.TimeoutException):
            pass
        remaining = deadline - time.monotonic()
        if remaining > interval:
            print(f"  Attempt {attempt}: not ready, retrying in {interval}s...")
            time.sleep(interval)
    return False

# Wait for PostgreSQL API health endpoint after deploy
# ready = wait_for_service("http://localhost:3000/health", max_wait=120)

Part 4: SSH Automation with Fabric

# fabric_automation.py
# pip install fabric

from fabric import Connection, Config
from pathlib import Path
import io

# ── Pattern 1: Run commands on a remote server ────────────────────────────
def get_server_info(host: str, user: str = "ubuntu") -> dict:
    """Collect basic server info via SSH."""
    with Connection(
        host=host,
        user=user,
        connect_kwargs={"key_filename": str(Path.home() / ".ssh" / "id_ed25519")}
    ) as conn:
        uptime   = conn.run("uptime -p", hide=True).stdout.strip()
        mem      = conn.run("free -h | awk 'NR==2{print $2\" total, \"$3\" used\"}'", hide=True).stdout.strip()
        disk     = conn.run("df -h / | awk 'NR==2{print $4\" free of \"$2}'", hide=True).stdout.strip()
        os_ver   = conn.run("lsb_release -d | cut -f2", hide=True).stdout.strip()
        load     = conn.run("cat /proc/loadavg | cut -d' ' -f1-3", hide=True).stdout.strip()

        return {
            "host": host,
            "os": os_ver,
            "uptime": uptime,
            "memory": mem,
            "disk": disk,
            "load_avg": load
        }

# info = get_server_info("10.0.0.100")
# print(info)

Expected output (when run against a real server):

{
    'host': '10.0.0.100',
    'os': 'Ubuntu 24.04.2 LTS',
    'uptime': 'up 3 days, 4 hours, 12 minutes',
    'memory': '7.7Gi total, 2.3Gi used',
    'disk': '45G free of 79G',
    'load_avg': '0.12 0.08 0.05'
}
# ── Pattern 2: Deploy an application ─────────────────────────────────────
def deploy_app(host: str, user: str, app_dir: str = "/opt/myapp") -> bool:
    """
    Deploy latest application version via SSH.
    Steps: git pull → docker compose pull → docker compose up -d → health check
    """
    with Connection(host, user=user) as conn:
        print(f"Deploying to {host}...")

        # Pull latest code
        result = conn.run(f"cd {app_dir} && git pull origin main", hide=True)
        commit = conn.run(
            f"cd {app_dir} && git log -1 --format='%h %s'", hide=True
        ).stdout.strip()
        print(f"  Code: {commit}")

        # Pull new Docker images
        conn.run(f"cd {app_dir} && docker compose pull", hide=True)
        print("  Images pulled")

        # Rolling restart
        conn.run(
            f"cd {app_dir} && docker compose up -d --no-deps app",
            hide=True
        )
        print("  Containers restarted")

        # Wait for health check
        import time
        for attempt in range(12):  # 60 second timeout
            result = conn.run(
                "docker compose ps --format json | python3 -c \""
                "import json,sys; data=[json.loads(l) for l in sys.stdin if l.strip()]; "
                "unhealthy=[c['Name'] for c in data if 'unhealthy' in c.get('Health','')]; "
                "print('unhealthy:' + ','.join(unhealthy) if unhealthy else 'healthy')\"",
                hide=True
            ).stdout.strip()
            if result == "healthy":
                print(f"  ✓ Deployment successful ({attempt * 5}s)")
                return True
            time.sleep(5)

        print("  ✗ Health check timed out")
        return False
# ── Pattern 3: Upload files and run remote commands ───────────────────────
def update_config(host: str, user: str, local_config: Path, remote_path: str) -> None:
    """Upload a config file and reload the affected service."""
    with Connection(host, user=user) as conn:
        # Upload the file
        conn.put(str(local_config), remote=remote_path)
        print(f"  Uploaded {local_config.name}{remote_path}")

        # Test nginx config before reload
        test = conn.sudo("nginx -t", hide=True, warn=True)
        if test.exited != 0:
            print(f"  ✗ Nginx config invalid: {test.stderr}")
            return

        # Reload without downtime
        conn.sudo("systemctl reload nginx", hide=True)
        print("  ✓ Nginx reloaded")

Part 5: System Monitoring Script

# system_monitor.py
# A complete server monitoring script with rich terminal output

import subprocess
import json
import shutil
from pathlib import Path
from datetime import datetime

try:
    from rich.console import Console
    from rich.table import Table
    from rich import box
    HAS_RICH = True
except ImportError:
    HAS_RICH = False

def collect_metrics() -> dict:
    """Collect system metrics using standard Linux tools."""
    def run(cmd):
        r = subprocess.run(cmd, shell=True, capture_output=True, text=True)
        return r.stdout.strip()

    # CPU load average
    load = run("cat /proc/loadavg").split()[:3]

    # Memory
    mem_lines = run("free -b").splitlines()
    mem_vals = mem_lines[1].split()
    mem_total = int(mem_vals[1])
    mem_used  = int(mem_vals[2])
    mem_pct   = round(mem_used / mem_total * 100, 1)

    # Disk usage for /
    disk_line = run("df -B1 / | tail -1").split()
    disk_total = int(disk_line[1])
    disk_used  = int(disk_line[2])
    disk_pct   = round(disk_used / disk_total * 100, 1)

    # Docker containers
    containers = []
    raw = run("docker ps --format json 2>/dev/null")
    if raw:
        for line in raw.splitlines():
            try:
                c = json.loads(line)
                containers.append({
                    "name": c.get("Names", ""),
                    "image": c.get("Image", ""),
                    "status": c.get("Status", ""),
                    "health": "✓" if "healthy" in c.get("Status", "") else
                              "✗" if "unhealthy" in c.get("Status", "") else "~"
                })
            except json.JSONDecodeError:
                pass

    # Key services
    services = {}
    for svc in ["nginx", "postgresql", "ollama", "docker"]:
        r = subprocess.run(
            ["systemctl", "is-active", svc],
            capture_output=True, text=True
        )
        services[svc] = r.stdout.strip()

    return {
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "hostname": run("hostname"),
        "load": load,
        "memory": {"total_gb": mem_total / 1e9, "used_gb": mem_used / 1e9, "pct": mem_pct},
        "disk":   {"total_gb": disk_total / 1e9, "used_gb": disk_used / 1e9, "pct": disk_pct},
        "containers": containers,
        "services": services,
    }

def print_report(metrics: dict) -> None:
    """Print formatted system report."""
    if not HAS_RICH:
        # Plain text fallback
        print(f"\n=== Server Monitor: {metrics['hostname']} @ {metrics['timestamp']} ===")
        m = metrics["memory"]
        d = metrics["disk"]
        print(f"Memory: {m['used_gb']:.1f}/{m['total_gb']:.1f}GB ({m['pct']}%)")
        print(f"Disk:   {d['used_gb']:.1f}/{d['total_gb']:.1f}GB ({d['pct']}%)")
        print(f"Load:   {' '.join(metrics['load'])}")
        print("\nServices:", {k: v for k, v in metrics['services'].items()})
        return

    console = Console()
    console.print(f"\n[bold cyan]Server Monitor[/] — [dim]{metrics['hostname']}[/] @ {metrics['timestamp']}")

    # System table
    table = Table(box=box.SIMPLE, show_header=True)
    table.add_column("Metric",  style="bold")
    table.add_column("Value",   justify="right")
    table.add_column("Bar",     min_width=20)

    m = metrics["memory"]
    bar_mem = "█" * int(m["pct"] / 5) + "░" * (20 - int(m["pct"] / 5))
    color = "red" if m["pct"] > 90 else "yellow" if m["pct"] > 70 else "green"
    table.add_row("Memory", f"{m['used_gb']:.1f}/{m['total_gb']:.1f}GB ({m['pct']}%)",
                  f"[{color}]{bar_mem}[/]")

    d = metrics["disk"]
    bar_disk = "█" * int(d["pct"] / 5) + "░" * (20 - int(d["pct"] / 5))
    color = "red" if d["pct"] > 90 else "yellow" if d["pct"] > 70 else "green"
    table.add_row("Disk /", f"{d['used_gb']:.1f}/{d['total_gb']:.1f}GB ({d['pct']}%)",
                  f"[{color}]{bar_disk}[/]")

    table.add_row("Load (1/5/15m)", " / ".join(metrics["load"]), "")
    console.print(table)

    # Services
    svc_table = Table(title="Services", box=box.SIMPLE)
    svc_table.add_column("Service")
    svc_table.add_column("Status")
    for name, status in metrics["services"].items():
        color = "green" if status == "active" else "red"
        svc_table.add_row(name, f"[{color}]{status}[/]")
    console.print(svc_table)

    # Containers
    if metrics["containers"]:
        c_table = Table(title="Docker Containers", box=box.SIMPLE)
        c_table.add_column("Name")
        c_table.add_column("Image")
        c_table.add_column("Status")
        c_table.add_column("Health", justify="center")
        for c in metrics["containers"]:
            c_table.add_row(c["name"], c["image"][:40], c["status"], c["health"])
        console.print(c_table)

if __name__ == "__main__":
    metrics = collect_metrics()
    print_report(metrics)
python3 system_monitor.py

Expected output:

Server Monitor — hetzner-cx22 @ 2026-04-22 10:30:00

 Metric           Value                    Bar
 ─────────────────────────────────────────────
 Memory           1.8/3.8GB (47.2%)        ████████████░░░░░░░░
 Disk /           8.1/38.6GB (21.0%)       ████░░░░░░░░░░░░░░░░
 Load (1/5/15m)   0.23 / 0.18 / 0.15

 Services
 ─────────────────
 nginx       active
 postgresql  active
 ollama      active
 docker      active

 Docker Containers
 ──────────────────────────────────────────────────────────────
 prod-stack-nginx-1   nginx:1.27-alpine   Up 3 hours (healthy)   ✓
 prod-stack-app-1     prod-stack-app      Up 3 hours (healthy)   ✓
 prod-stack-db-1      postgres:17-alpine  Up 3 hours (healthy)   ✓

Part 6: Cron-Ready Automation Script

#!/usr/bin/env python3
# backup_and_alert.py
# Cron-friendly: runs silently on success, prints errors for cron to email

import subprocess
import sys
import json
from pathlib import Path
from datetime import datetime

BACKUP_DIR = Path("/opt/backups")
SLACK_WEBHOOK = ""  # Optional: set your webhook URL

def backup_postgres(db_name: str) -> Path:
    """Dump PostgreSQL database to compressed file."""
    BACKUP_DIR.mkdir(parents=True, exist_ok=True)
    date_str = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_path = BACKUP_DIR / f"{db_name}_{date_str}.pgdump"

    result = subprocess.run(
        ["sudo", "-u", "postgres", "pg_dump",
         "--format=custom", "--compress=9",
         f"--file={backup_path}", db_name],
        capture_output=True, text=True
    )

    if result.returncode != 0:
        raise RuntimeError(f"pg_dump failed: {result.stderr}")

    return backup_path

def cleanup_old_backups(days: int = 7) -> int:
    """Remove backup files older than N days. Returns count deleted."""
    import time
    cutoff = time.time() - (days * 86400)
    deleted = 0
    for f in BACKUP_DIR.glob("*.pgdump"):
        if f.stat().st_mtime < cutoff:
            f.unlink()
            deleted += 1
    return deleted

def send_slack_alert(message: str) -> None:
    """Send a Slack webhook notification."""
    if not SLACK_WEBHOOK:
        return
    import httpx
    try:
        httpx.post(SLACK_WEBHOOK, json={"text": message}, timeout=5.0)
    except Exception:
        pass

if __name__ == "__main__":
    try:
        path = backup_postgres("myapp")
        size_mb = path.stat().st_size / 1e6
        deleted = cleanup_old_backups(days=7)
        # Silent success — cron only emails on stdout/stderr output
        # Uncomment for verbose mode:
        # print(f"Backup OK: {path.name} ({size_mb:.1f}MB). Deleted {deleted} old backups.")
        sys.exit(0)
    except Exception as e:
        msg = f"BACKUP FAILED on {Path('/etc/hostname').read_text().strip()}: {e}"
        print(msg, file=sys.stderr)
        send_slack_alert(f"🚨 {msg}")
        sys.exit(1)
# Make executable and add to cron
chmod +x ~/devops-scripts/backup_and_alert.py

# Run at 3 AM daily
echo "0 3 * * * root /usr/bin/python3 /root/devops-scripts/backup_and_alert.py" | \
  sudo tee /etc/cron.d/python-backup

Troubleshooting

ModuleNotFoundError: No module named 'fabric'

Fix: pip install fabric --break-system-packages (Ubuntu 24.04 uses system Python with externally-managed environments).

subprocess command works in shell but fails in script

Cause: Shell expansion (~, *, $VAR) doesn’t work when cmd is a list. Fix: Either use shell=True (less safe) or expand manually:

# Wrong: subprocess.run(["ls", "~/logs"])
# Correct:
import os
subprocess.run(["ls", os.path.expanduser("~/logs")])
# Or:
subprocess.run("ls ~/logs", shell=True, capture_output=True, text=True)

PermissionError when writing to /etc/ or /var/

Cause: Script running as non-root user. Fix: Use subprocess.run(["sudo", "tee", "/etc/nginx/sites-enabled/myapp"]) to write with sudo, or run the script as root for system-level automation tasks.


Conclusion

Python is now your DevOps automation layer: subprocess runs any CLI tool and captures output, httpx handles async health checks across all your services in parallel, Fabric SSHes into remote servers for deployments and config pushes, and rich turns raw monitoring data into a professional dashboard. All of this runs on the same Python 3.12 that ships with Ubuntu 24.04.

Connect this to your CI/CD pipeline with GitHub Actions CI/CD Tutorial to run these scripts automatically on every deployment, or pair with Build a REST API with FastAPI to expose monitoring data via an HTTP endpoint.


People Also Ask

Should I use Python or Bash for DevOps automation?

Use Bash for simple, linear scripts under 20 lines that chain CLI tools together. Use Python for anything more complex: error handling, data parsing, API calls, SSH automation, testing, or scripts that need to be maintained long-term. The inflection point is when you need conditionals, loops over data structures, or error recovery — Bash becomes brittle quickly, while Python’s subprocess module gives you the same CLI access with the full language available.

What is the difference between subprocess.run and subprocess.Popen?

subprocess.run is the high-level function for running a command and waiting for it to finish — use this for 95% of DevOps scripting. subprocess.Popen is the low-level class for more complex scenarios: streaming output in real time while also processing it, bidirectional stdin/stdout communication, or running a process in the background while continuing other work. Always prefer subprocess.run unless you specifically need Popen’s lower-level control.

How do I securely pass passwords to subprocess commands?

Never pass passwords as command-line arguments — they appear in ps aux output. Use environment variables instead:

env = {**os.environ, "PGPASSWORD": db_password}
subprocess.run(["pg_dump", "mydb"], env=env, check=True)

Or use subprocess.run with input= for tools that read passwords from stdin: subprocess.run(["mysql", "-u", "root", "-p"], input=f"{password}\n", text=True).


Further Reading


Tested on: Ubuntu 24.04 LTS (Hetzner CX22). Python 3.12.3, httpx 0.28.1, fabric 3.2.2, rich 13.9.4. Last verified: April 22, 2026.

Further Reading

All Dev Corner

Comments