// practical snippets

Code Examples

Практические фрагменты кода для самостоятельного обучения. Это не готовые атакующие инструменты и не копии чужих проектов: каждый пример ограничен безопасной задачей, работает с локальными файлами или явно заданными собственными целями и показывает инженерный подход к defensive security.

read-onlylocal-firstdefensiveredacted evidencecontrolled lab

// why code

Не только теория

Каждый пример показывает маленький проверяемый кусок логики, а не абстрактное описание. Опасные классы кода не публикуются: credential abuse, keylogging, C2, destructive payloads. Формат подходит для лабораторных стендов, личных заметок и defensive-checklist.

// snippets

Учебные фрагменты

foundations Web/API

HTTP security headers checker

Минимальный read-only чекер security headers для собственного сайта или лабораторного стенда.

Safety boundary: Принимает один URL, не делает crawling, не отправляет payload и не меняет состояние.

from urllib.parse import urlparse
import requests

REQUIRED_HEADERS = {
    "strict-transport-security": "HSTS снижает риск downgrade на HTTP",
    "content-security-policy": "CSP ограничивает выполнение чужого кода",
    "x-frame-options": "защита от clickjacking для старых клиентов",
    "referrer-policy": "меньше утечек URL через Referer",
}


def check_headers(url: str) -> dict[str, str]:
    parsed = urlparse(url)
    if parsed.scheme not in {"https", "http"} or not parsed.netloc:
        raise ValueError("Передайте полный URL: https://example.com")

    response = requests.get(url, timeout=8, allow_redirects=True)
    headers = {k.lower(): v for k, v in response.headers.items()}

    return {
        name: "ok" if name in headers else f"missing: {reason}"
        for name, reason in REQUIRED_HEADERS.items()
    }


if __name__ == "__main__":
    for header, status in check_headers("https://example.com").items():
        print(f"{header}: {status}")
foundations Network

DNS inventory snapshot

Собирает базовую карту DNS-записей домена для инвентаризации периметра.

Safety boundary: Только DNS-запросы к публичным резолверам; не сканирует IP и не трогает сервисы.

import dns.resolver

RECORD_TYPES = ["A", "AAAA", "MX", "TXT", "NS", "CAA"]


def resolve_domain(domain: str) -> dict[str, list[str]]:
    resolver = dns.resolver.Resolver()
    resolver.lifetime = 4
    result: dict[str, list[str]] = {}

    for record_type in RECORD_TYPES:
        try:
            answers = resolver.resolve(domain, record_type)
            result[record_type] = sorted(str(answer).strip() for answer in answers)
        except Exception as exc:
            result[record_type] = [f"not available: {exc.__class__.__name__}"]

    return result


for rtype, values in resolve_domain("example.com").items():
    print(f"[{rtype}]")
    for value in values:
        print(" ", value)
foundations Cryptography

Hash shape identifier

Учебный классификатор формы хэша: помогает отличить encoding от hash и не путать это с шифрованием.

Safety boundary: Не взламывает хэши и не делает brute force; только проверяет длину и алфавит.

import re

PATTERNS = {
    "md5-like": re.compile(r"^[a-f0-9]{32}$", re.I),
    "sha1-like": re.compile(r"^[a-f0-9]{40}$", re.I),
    "sha256-like": re.compile(r"^[a-f0-9]{64}$", re.I),
    "bcrypt-like": re.compile(r"^\$2[aby]\$\d{2}\$.{53}$"),
    "argon2-like": re.compile(r"^\$argon2(id|i|d)\$"),
}


def identify(value: str) -> list[str]:
    value = value.strip()
    matches = [name for name, pattern in PATTERNS.items() if pattern.match(value)]
    return matches or ["unknown"]


samples = [
    "5d41402abc4b2a76b9719d911017c592",
    "$argon2id$v=19$m=65536,t=3,p=4$...",
]

for sample in samples:
    print(sample, "=>", ", ".join(identify(sample)))
beginner Network

Allowlisted service inventory

Показывает, как проверять только заранее разрешённые host:port пары, не превращая пример в массовый сканер.

Safety boundary: Нет диапазонов, нет многопоточности, нет fingerprinting; только allowlist владельца стенда.

import socket
from dataclasses import dataclass


@dataclass(frozen=True)
class Service:
    host: str
    port: int
    expected: str


ALLOWLIST = [
    Service("127.0.0.1", 22, "ssh"),
    Service("127.0.0.1", 443, "https"),
]


def is_open(service: Service, timeout: float = 1.0) -> bool:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.settimeout(timeout)
        return sock.connect_ex((service.host, service.port)) == 0


for service in ALLOWLIST:
    status = "open" if is_open(service) else "closed"
    print(f"{service.host}:{service.port} {service.expected} => {status}")
beginner Privacy

Metadata report for local files

Показывает, как искать видимые метаданные в локальных изображениях перед публикацией.

Safety boundary: Работает только с локальной папкой, ничего не отправляет наружу и не удаляет оригиналы.

from pathlib import Path
from PIL import Image


def image_metadata(path: Path) -> dict[str, str]:
    with Image.open(path) as img:
        return {
            "format": img.format or "unknown",
            "size": f"{img.width}x{img.height}",
            "exif_fields": str(len(img.getexif())),
        }


for file_path in Path("publish-candidates").glob("*"):
    if file_path.suffix.lower() not in {".jpg", ".jpeg", ".png", ".webp"}:
        continue
    try:
        print(file_path.name, image_metadata(file_path))
    except Exception as exc:
        print(file_path.name, "metadata read failed:", exc)
intermediate Secrets

Secrets redactor before evidence sharing

Минимальный redaction-слой для логов и evidence перед отправкой в отчёт.

Safety boundary: Не валидирует токены и не отправляет их в сторонние сервисы; только маскирует похожие значения.

import re

RULES = [
    (re.compile(r"AKIA[0-9A-Z]{16}"), "AWS_ACCESS_KEY"),
    (re.compile(r"gh[pousr]_[A-Za-z0-9_]{20,}"), "GITHUB_TOKEN"),
    (re.compile(r"(?i)(password|passwd|pwd)\s*[=:]\s*\S+"), "PASSWORD_FIELD"),
    (re.compile(r"(?i)(authorization:\s*bearer\s+)[A-Za-z0-9._-]+"), "BEARER_TOKEN"),
]


def redact(text: str) -> str:
    for pattern, label in RULES:
        text = pattern.sub(f"[REDACTED:{label}]", text)
    return text


raw_log = "Authorization: Bearer eyJhbGciOi... password=not-for-report"
print(redact(raw_log))
intermediate Cloud/Container

Docker Compose risk hints

Проверяет compose-файл на несколько частых risky-настроек.

Safety boundary: Статический анализ локального YAML; контейнеры не запускаются и не меняются.

from pathlib import Path
import yaml


def audit_compose(path: Path) -> list[str]:
    doc = yaml.safe_load(path.read_text()) or {}
    findings: list[str] = []

    for name, service in (doc.get("services") or {}).items():
        if service.get("privileged") is True:
            findings.append(f"{name}: privileged=true")
        if service.get("user") in (None, "", "0", "root"):
            findings.append(f"{name}: container may run as root")
        for volume in service.get("volumes") or []:
            if "/var/run/docker.sock" in str(volume):
                findings.append(f"{name}: docker.sock mounted")
        if service.get("network_mode") == "host":
            findings.append(f"{name}: host network mode")

    return findings


for finding in audit_compose(Path("compose.yml")):
    print("risk:", finding)
intermediate Supply Chain

SBOM package matcher

Учебный пример сопоставления SBOM-компонентов с локальным списком известных рисков.

Safety boundary: Использует локальный пример базы уязвимостей; не делает сетевых запросов и не меняет проект.

import json
from pathlib import Path

KNOWN_RISKS = {
    ("openssl", "1.0.1"): "outdated crypto library",
    ("log4j-core", "2.14.1"): "critical historical risk, upgrade required",
}


def load_components(path: Path) -> list[tuple[str, str]]:
    sbom = json.loads(path.read_text())
    components = sbom.get("components", [])
    return [(item.get("name", ""), item.get("version", "")) for item in components]


for name, version in load_components(Path("sbom.cdx.json")):
    risk = KNOWN_RISKS.get((name, version))
    if risk:
        print(f"{name}@{version}: {risk}")
intermediate Web/API

OpenAPI sensitive flow inventory

Извлекает из OpenAPI endpoints, которые обычно требуют ручной проверки авторизации и бизнес-логики.

Safety boundary: Только парсинг спецификации; не отправляет HTTP-запросы.

import json
from pathlib import Path

KEYWORDS = {
    "auth": ["login", "token", "session"],
    "account": ["user", "profile", "account"],
    "billing": ["invoice", "payment", "subscription"],
    "upload": ["upload", "file", "avatar"],
    "admin": ["admin", "role", "permission"],
}


def classify(path: str) -> list[str]:
    low = path.lower()
    return [label for label, words in KEYWORDS.items() if any(word in low for word in words)]


spec = json.loads(Path("openapi.json").read_text())
for path, methods in spec.get("paths", {}).items():
    labels = classify(path)
    if labels:
        print(path, "=>", ", ".join(labels), "methods:", ", ".join(methods.keys()))
intermediate SOC

Auth log summary for SIEM triage

Сжимает auth-логи в понятную таблицу источников ошибок входа.

Safety boundary: Читает локальный лог-файл и не публикует raw usernames; показывает агрегаты.

from collections import Counter
from pathlib import Path
import re

FAILED = re.compile(r"Failed password .* from (?P<ip>\d+\.\d+\.\d+\.\d+)")


def summarize_failed_logins(path: Path) -> Counter[str]:
    counter: Counter[str] = Counter()
    for line in path.read_text(errors="ignore").splitlines():
        match = FAILED.search(line)
        if match:
            counter[match.group("ip")] += 1
    return counter


for ip, count in summarize_failed_logins(Path("auth.log")).most_common(10):
    print(f"{ip}: {count} failed attempts")
advanced Monitoring

Service health check with clear output

Пример маленького health-check скрипта для uptime/incident monitoring.

Safety boundary: Проверяет только явно заданные URL, не сканирует сайт и не обходит авторизацию.

import time
import requests

TARGETS = {
    "main_site": "https://example.com/",
    "status_page": "https://example.com/status",
}


def check(name: str, url: str) -> str:
    started = time.perf_counter()
    try:
        response = requests.get(url, timeout=5)
        latency_ms = int((time.perf_counter() - started) * 1000)
        return f"{name}: HTTP {response.status_code}, {latency_ms} ms"
    except requests.RequestException as exc:
        return f"{name}: DOWN, {exc.__class__.__name__}"


for name, url in TARGETS.items():
    print(check(name, url))
advanced Threat Detection

Access-log feature extraction

Показывает, какие признаки можно извлечь из access-log перед rule/ML triage.

Safety boundary: Не делает выводов об атаке сам по себе; только готовит признаки для аналитика.

from dataclasses import dataclass
from urllib.parse import urlparse, parse_qs


@dataclass
class HttpFeatures:
    path_depth: int
    query_keys: int
    has_encoded_chars: bool
    looks_like_admin_path: bool


def extract_features(url: str) -> HttpFeatures:
    parsed = urlparse(url)
    path_parts = [part for part in parsed.path.split("/") if part]
    query = parse_qs(parsed.query, keep_blank_values=True)
    return HttpFeatures(
        path_depth=len(path_parts),
        query_keys=len(query),
        has_encoded_chars="%" in url,
        looks_like_admin_path=any(part in {"admin", "manage", "debug"} for part in path_parts),
    )


print(extract_features("/account/profile?id=42"))

// обновлено

Новый defensive code pack

Добавлены короткие примеры для API inventory, redacted evidence, CVE triage и AI policy gate. Они дополняют статьи и показывают, как превращать проблему в проверяемую инженерную логику.

defensive

Read-only HAR inventory

Авторский безопасный фрагмент для owned/lab-среды. Используется как идея проверки, а не как автономный атакующий инструмент.

import json
from pathlib import Path
from urllib.parse import urlsplit

OBJECT_KEYS = ("id", "userId", "accountId", "profileId", "invoiceId", "orderId")

def collect_operations(har_path: str) -> list[dict]:
    har = json.loads(Path(har_path).read_text(encoding="utf-8"))
    operations = []
    for item in har.get("log", {}).get("entries", []):
        req = item.get("request", {})
        method = req.get("method", "GET").upper()
        url = req.get("url", "")
        parsed = urlsplit(url)
        params = [p.get("name", "") for p in req.get("queryString", [])]
        object_params = [name for name in params if name in OBJECT_KEYS or name.lower().endswith("id")]
        operations.append({
            "method": method,
            "host": parsed.netloc,
            "path": parsed.path,
            "read_only": method in {"GET", "HEAD", "OPTIONS"},
            "object_params": sorted(set(object_params)),
        })
    return operations

for op in collect_operations("owned-test-session.har"):
    if op["read_only"] and op["object_params"]:
        print(op)
defensive

Evidence secret redactor

Авторский безопасный фрагмент для owned/lab-среды. Используется как идея проверки, а не как автономный атакующий инструмент.

import re

PATTERNS = [
    re.compile(r"(?i)(api[_-]?key|token|secret|password)\s*[:=]\s*['\"]?([A-Za-z0-9_\-]{12,})"),
    re.compile(r"AKIA[0-9A-Z]{16}"),
    re.compile(r"gh[pousr]_[A-Za-z0-9_]{20,}"),
]

def redact(text: str) -> str:
    result = text
    for pattern in PATTERNS:
        result = pattern.sub(lambda m: m.group(0)[:8] + "...[REDACTED]", result)
    return result

sample = "api_key='abcd1234abcd1234abcd1234' and deploy_token=ghp_exampletokenexampletoken"
print(redact(sample))
defensive

CVE remediation priority

Авторский безопасный фрагмент для owned/lab-среды. Используется как идея проверки, а не как автономный атакующий инструмент.

def priority(vuln: dict) -> str:
    score = 0
    if vuln.get("severity") in {"Critical", "High"}:
        score += 2
    if vuln.get("internet_exposed"):
        score += 2
    if vuln.get("runtime_reachable"):
        score += 2
    if vuln.get("known_exploited"):
        score += 3
    if vuln.get("fixed_version"):
        score += 1
    if score >= 6:
        return "fix-now"
    if score >= 3:
        return "schedule"
    return "monitor"

print(priority({
    "severity": "High",
    "internet_exposed": True,
    "runtime_reachable": True,
    "known_exploited": False,
    "fixed_version": "2.4.1",
}))
defensive

AI policy gate

Авторский безопасный фрагмент для owned/lab-среды. Используется как идея проверки, а не как автономный атакующий инструмент.

ALLOWED_TECHNIQUES = {
    "recon.http_headers",
    "recon.tls_inventory",
    "api.inventory_from_har",
    "session.authenticated_replay_readonly",
}

def policy_decision(step: dict) -> tuple[bool, str]:
    technique = step.get("technique")
    method = step.get("http_method", "GET").upper()
    if technique not in ALLOWED_TECHNIQUES:
        return False, "technique is not allowlisted"
    if method not in {"GET", "HEAD", "OPTIONS"}:
        return False, "state-changing replay requires approval gate"
    if step.get("target_out_of_scope"):
        return False, "target is outside verified scope"
    return True, "allowed"

print(policy_decision({"technique": "api.inventory_from_har", "http_method": "GET"}))

// обновлено

Boundary & verification code pack

Добавлен новый набор безопасных примеров: OpenAPI/HAR diff, session rotation, query budget, Gatekeeper policy, attestation verify и KPI-подсчёт для detection program. Это не эксплойты, а маленькие инженерные фрагменты, которые помогают проверять зрелость defensive-практики.

web/api

Diff OpenAPI Рё observed traffic РїРѕ path/method

Если у команды есть только OpenAPI, она видит желаемую архитектуру. Если есть только трафик, она не понимает намерение. Полезный аудит строится на diff между схемой, HAR, gateway-логами и правами ролей.

import json
from collections import defaultdict
from pathlib import Path


def openapi_ops(path: str) -> set[tuple[str, str]]:
    spec = json.loads(Path(path).read_text(encoding="utf-8"))
    result = set()
    for route, methods in spec.get("paths", {}).items():
        for method in methods:
            result.add((method.upper(), route))
    return result


def observed_ops(path: str) -> set[tuple[str, str]]:
    data = json.loads(Path(path).read_text(encoding="utf-8"))
    result = set()
    for item in data:
        result.add((item["method"].upper(), item["path"]))
    return result


doc = openapi_ops("openapi.json")
obs = observed_ops("traffic_inventory.json")
print("observed_only", sorted(obs - doc)[:10])
print("doc_only", sorted(doc - obs)[:10])
identity

Session rotation после WebAuthn step-up

Продукт кажется современным, когда на логине есть passkeys. Но зрелая проверка смотрит дальше: как меняется session id после step-up, какие recovery flows сильнее или слабее WebAuthn и может ли stolen session обойти весь новый красивый UX.

def complete_webauthn_stepup(session: dict, user_id: str, assurance: str) -> dict:
    fresh = {
        "user_id": user_id,
        "session_id": issue_new_session_id(),
        "assurance": assurance,
        "stepup_at": now_iso(),
        "csrf_token": issue_csrf_token(),
    }
    revoke_session(session["session_id"])
    persist_session(fresh)
    return fresh


def privileged_action_allowed(session: dict, max_age_seconds: int = 900) -> bool:
    if session.get("assurance") not in {"webauthn", "mfa"}:
        return False
    return age_seconds(session.get("stepup_at")) <= max_age_seconds
visibility

Мини-pack с безопасным query budget

Хороший security visibility слой задаёт не только вопрос 'что можно спросить у хоста', но и 'какой ценой'. В зрелой среде инвентаризация, hardening и compliance-запросы отделены от тяжёлых ad hoc checks.

{
  "queries": {
    "listening_ports_inventory": {
      "query": "select pid, port, protocol, address from listening_ports;",
      "interval": 3600,
      "description": "Inventory of externally relevant listening ports",
      "platform": "linux,darwin,windows",
      "snapshot": true
    },
    "sshd_hardening_check": {
      "query": "select key, value from ssh_configs where key in ('PermitRootLogin','PasswordAuthentication');",
      "interval": 21600,
      "description": "Low-cost SSH hardening verification",
      "platform": "linux"
    }
  }
}
kubernetes

Gatekeeper constraint для запрета privileged containers

Зрелая проверка Kubernetes строится слоями: inventory и posture, policy-as-code на admission, runtime signals и проверяемый hardening diff. Только тогда misconfiguration перестаёт возвращаться в следующем deploy.

apiVersion: constraints.gatekeeper.sh/v1beta1
kind: K8sPSPPrivilegedContainer
metadata:
  name: disallow-privileged
spec:
  match:
    kinds:
      - apiGroups: [""]
        kinds: ["Pod"]
    excludedNamespaces:
      - kube-system
  parameters:
    exemptImages:
      - "registry.example/trusted-debug:*"
supply chain

Проверка attestation перед релизом

Наличие SBOM ещё не означает доверие к релизу. Полезная цепочка включает digest, provenance, signed attestations, policy decision и понятный verification step перед deploy.

IMAGE="registry.example/app@sha256:REPLACE_ME"

# verify the image signature
cosign verify "$IMAGE"

# verify attached attestations / provenance
cosign verify-attestation "$IMAGE"

# optional: require annotations or policy claims
cosign verify-attestation -a env=production "$IMAGE"
soc

Простой подсчёт FP rate и stale rules

Detection engineering становится полезной для бизнеса, когда умеет честно измерять шум, скорость исправления и качество handoff. Иначе SOC просто получает всё больше правил и всё меньше доверия к ним.

from datetime import date


def rule_health(rule: dict) -> dict:
    alerts = max(rule.get("alerts", 0), 1)
    fp_rate = rule.get("false_positives", 0) / alerts
    stale = (date.today() - date.fromisoformat(rule["last_review"])).days > 90
    return {"rule_id": rule["id"], "fp_rate": round(fp_rate, 2), "stale": stale}


sample = {"id": "edge-404-burst", "alerts": 42, "false_positives": 18, "last_review": "2026-03-20"}
print(rule_health(sample))