// practical snippets
Code Examples
Практические фрагменты кода для самостоятельного обучения. Это не готовые атакующие инструменты и не копии чужих проектов: каждый пример ограничен безопасной задачей, работает с локальными файлами или явно заданными собственными целями и показывает инженерный подход к defensive security.
read-onlylocal-firstdefensiveredacted evidencecontrolled lab
// snippets
Учебные фрагменты
foundations
Network
DNS inventory snapshot
Собирает базовую карту DNS-записей домена для инвентаризации периметра.
Safety boundary: Только DNS-запросы к публичным резолверам; не сканирует IP и не трогает сервисы.
import dns.resolver
RECORD_TYPES = ["A", "AAAA", "MX", "TXT", "NS", "CAA"]
def resolve_domain(domain: str) -> dict[str, list[str]]:
resolver = dns.resolver.Resolver()
resolver.lifetime = 4
result: dict[str, list[str]] = {}
for record_type in RECORD_TYPES:
try:
answers = resolver.resolve(domain, record_type)
result[record_type] = sorted(str(answer).strip() for answer in answers)
except Exception as exc:
result[record_type] = [f"not available: {exc.__class__.__name__}"]
return result
for rtype, values in resolve_domain("example.com").items():
print(f"[{rtype}]")
for value in values:
print(" ", value)
foundations
Cryptography
Hash shape identifier
Учебный классификатор формы хэша: помогает отличить encoding от hash и не путать это с шифрованием.
Safety boundary: Не взламывает хэши и не делает brute force; только проверяет длину и алфавит.
import re
PATTERNS = {
"md5-like": re.compile(r"^[a-f0-9]{32}$", re.I),
"sha1-like": re.compile(r"^[a-f0-9]{40}$", re.I),
"sha256-like": re.compile(r"^[a-f0-9]{64}$", re.I),
"bcrypt-like": re.compile(r"^\$2[aby]\$\d{2}\$.{53}$"),
"argon2-like": re.compile(r"^\$argon2(id|i|d)\$"),
}
def identify(value: str) -> list[str]:
value = value.strip()
matches = [name for name, pattern in PATTERNS.items() if pattern.match(value)]
return matches or ["unknown"]
samples = [
"5d41402abc4b2a76b9719d911017c592",
"$argon2id$v=19$m=65536,t=3,p=4$...",
]
for sample in samples:
print(sample, "=>", ", ".join(identify(sample)))
beginner
Network
Allowlisted service inventory
Показывает, как проверять только заранее разрешённые host:port пары, не превращая пример в массовый сканер.
Safety boundary: Нет диапазонов, нет многопоточности, нет fingerprinting; только allowlist владельца стенда.
import socket
from dataclasses import dataclass
@dataclass(frozen=True)
class Service:
host: str
port: int
expected: str
ALLOWLIST = [
Service("127.0.0.1", 22, "ssh"),
Service("127.0.0.1", 443, "https"),
]
def is_open(service: Service, timeout: float = 1.0) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
return sock.connect_ex((service.host, service.port)) == 0
for service in ALLOWLIST:
status = "open" if is_open(service) else "closed"
print(f"{service.host}:{service.port} {service.expected} => {status}")
beginner
Privacy
Metadata report for local files
Показывает, как искать видимые метаданные в локальных изображениях перед публикацией.
Safety boundary: Работает только с локальной папкой, ничего не отправляет наружу и не удаляет оригиналы.
from pathlib import Path
from PIL import Image
def image_metadata(path: Path) -> dict[str, str]:
with Image.open(path) as img:
return {
"format": img.format or "unknown",
"size": f"{img.width}x{img.height}",
"exif_fields": str(len(img.getexif())),
}
for file_path in Path("publish-candidates").glob("*"):
if file_path.suffix.lower() not in {".jpg", ".jpeg", ".png", ".webp"}:
continue
try:
print(file_path.name, image_metadata(file_path))
except Exception as exc:
print(file_path.name, "metadata read failed:", exc)
intermediate
Secrets
Secrets redactor before evidence sharing
Минимальный redaction-слой для логов и evidence перед отправкой в отчёт.
Safety boundary: Не валидирует токены и не отправляет их в сторонние сервисы; только маскирует похожие значения.
import re
RULES = [
(re.compile(r"AKIA[0-9A-Z]{16}"), "AWS_ACCESS_KEY"),
(re.compile(r"gh[pousr]_[A-Za-z0-9_]{20,}"), "GITHUB_TOKEN"),
(re.compile(r"(?i)(password|passwd|pwd)\s*[=:]\s*\S+"), "PASSWORD_FIELD"),
(re.compile(r"(?i)(authorization:\s*bearer\s+)[A-Za-z0-9._-]+"), "BEARER_TOKEN"),
]
def redact(text: str) -> str:
for pattern, label in RULES:
text = pattern.sub(f"[REDACTED:{label}]", text)
return text
raw_log = "Authorization: Bearer eyJhbGciOi... password=not-for-report"
print(redact(raw_log))
intermediate
Cloud/Container
Docker Compose risk hints
Проверяет compose-файл на несколько частых risky-настроек.
Safety boundary: Статический анализ локального YAML; контейнеры не запускаются и не меняются.
from pathlib import Path
import yaml
def audit_compose(path: Path) -> list[str]:
doc = yaml.safe_load(path.read_text()) or {}
findings: list[str] = []
for name, service in (doc.get("services") or {}).items():
if service.get("privileged") is True:
findings.append(f"{name}: privileged=true")
if service.get("user") in (None, "", "0", "root"):
findings.append(f"{name}: container may run as root")
for volume in service.get("volumes") or []:
if "/var/run/docker.sock" in str(volume):
findings.append(f"{name}: docker.sock mounted")
if service.get("network_mode") == "host":
findings.append(f"{name}: host network mode")
return findings
for finding in audit_compose(Path("compose.yml")):
print("risk:", finding)
intermediate
Supply Chain
SBOM package matcher
Учебный пример сопоставления SBOM-компонентов с локальным списком известных рисков.
Safety boundary: Использует локальный пример базы уязвимостей; не делает сетевых запросов и не меняет проект.
import json
from pathlib import Path
KNOWN_RISKS = {
("openssl", "1.0.1"): "outdated crypto library",
("log4j-core", "2.14.1"): "critical historical risk, upgrade required",
}
def load_components(path: Path) -> list[tuple[str, str]]:
sbom = json.loads(path.read_text())
components = sbom.get("components", [])
return [(item.get("name", ""), item.get("version", "")) for item in components]
for name, version in load_components(Path("sbom.cdx.json")):
risk = KNOWN_RISKS.get((name, version))
if risk:
print(f"{name}@{version}: {risk}")
intermediate
Web/API
OpenAPI sensitive flow inventory
Извлекает из OpenAPI endpoints, которые обычно требуют ручной проверки авторизации и бизнес-логики.
Safety boundary: Только парсинг спецификации; не отправляет HTTP-запросы.
import json
from pathlib import Path
KEYWORDS = {
"auth": ["login", "token", "session"],
"account": ["user", "profile", "account"],
"billing": ["invoice", "payment", "subscription"],
"upload": ["upload", "file", "avatar"],
"admin": ["admin", "role", "permission"],
}
def classify(path: str) -> list[str]:
low = path.lower()
return [label for label, words in KEYWORDS.items() if any(word in low for word in words)]
spec = json.loads(Path("openapi.json").read_text())
for path, methods in spec.get("paths", {}).items():
labels = classify(path)
if labels:
print(path, "=>", ", ".join(labels), "methods:", ", ".join(methods.keys()))
intermediate
SOC
Auth log summary for SIEM triage
Сжимает auth-логи в понятную таблицу источников ошибок входа.
Safety boundary: Читает локальный лог-файл и не публикует raw usernames; показывает агрегаты.
from collections import Counter
from pathlib import Path
import re
FAILED = re.compile(r"Failed password .* from (?P<ip>\d+\.\d+\.\d+\.\d+)")
def summarize_failed_logins(path: Path) -> Counter[str]:
counter: Counter[str] = Counter()
for line in path.read_text(errors="ignore").splitlines():
match = FAILED.search(line)
if match:
counter[match.group("ip")] += 1
return counter
for ip, count in summarize_failed_logins(Path("auth.log")).most_common(10):
print(f"{ip}: {count} failed attempts")
advanced
Monitoring
Service health check with clear output
Пример маленького health-check скрипта для uptime/incident monitoring.
Safety boundary: Проверяет только явно заданные URL, не сканирует сайт и не обходит авторизацию.
import time
import requests
TARGETS = {
"main_site": "https://example.com/",
"status_page": "https://example.com/status",
}
def check(name: str, url: str) -> str:
started = time.perf_counter()
try:
response = requests.get(url, timeout=5)
latency_ms = int((time.perf_counter() - started) * 1000)
return f"{name}: HTTP {response.status_code}, {latency_ms} ms"
except requests.RequestException as exc:
return f"{name}: DOWN, {exc.__class__.__name__}"
for name, url in TARGETS.items():
print(check(name, url))
advanced
Threat Detection
Access-log feature extraction
Показывает, какие признаки можно извлечь из access-log перед rule/ML triage.
Safety boundary: Не делает выводов об атаке сам по себе; только готовит признаки для аналитика.
from dataclasses import dataclass
from urllib.parse import urlparse, parse_qs
@dataclass
class HttpFeatures:
path_depth: int
query_keys: int
has_encoded_chars: bool
looks_like_admin_path: bool
def extract_features(url: str) -> HttpFeatures:
parsed = urlparse(url)
path_parts = [part for part in parsed.path.split("/") if part]
query = parse_qs(parsed.query, keep_blank_values=True)
return HttpFeatures(
path_depth=len(path_parts),
query_keys=len(query),
has_encoded_chars="%" in url,
looks_like_admin_path=any(part in {"admin", "manage", "debug"} for part in path_parts),
)
print(extract_features("/account/profile?id=42"))
defensive
Read-only HAR inventory
Авторский безопасный фрагмент для owned/lab-среды. Используется как идея проверки, а не как автономный атакующий инструмент.
import json
from pathlib import Path
from urllib.parse import urlsplit
OBJECT_KEYS = ("id", "userId", "accountId", "profileId", "invoiceId", "orderId")
def collect_operations(har_path: str) -> list[dict]:
har = json.loads(Path(har_path).read_text(encoding="utf-8"))
operations = []
for item in har.get("log", {}).get("entries", []):
req = item.get("request", {})
method = req.get("method", "GET").upper()
url = req.get("url", "")
parsed = urlsplit(url)
params = [p.get("name", "") for p in req.get("queryString", [])]
object_params = [name for name in params if name in OBJECT_KEYS or name.lower().endswith("id")]
operations.append({
"method": method,
"host": parsed.netloc,
"path": parsed.path,
"read_only": method in {"GET", "HEAD", "OPTIONS"},
"object_params": sorted(set(object_params)),
})
return operations
for op in collect_operations("owned-test-session.har"):
if op["read_only"] and op["object_params"]:
print(op)
defensive
Evidence secret redactor
Авторский безопасный фрагмент для owned/lab-среды. Используется как идея проверки, а не как автономный атакующий инструмент.
import re
PATTERNS = [
re.compile(r"(?i)(api[_-]?key|token|secret|password)\s*[:=]\s*['\"]?([A-Za-z0-9_\-]{12,})"),
re.compile(r"AKIA[0-9A-Z]{16}"),
re.compile(r"gh[pousr]_[A-Za-z0-9_]{20,}"),
]
def redact(text: str) -> str:
result = text
for pattern in PATTERNS:
result = pattern.sub(lambda m: m.group(0)[:8] + "...[REDACTED]", result)
return result
sample = "api_key='abcd1234abcd1234abcd1234' and deploy_token=ghp_exampletokenexampletoken"
print(redact(sample))
defensive
CVE remediation priority
Авторский безопасный фрагмент для owned/lab-среды. Используется как идея проверки, а не как автономный атакующий инструмент.
def priority(vuln: dict) -> str:
score = 0
if vuln.get("severity") in {"Critical", "High"}:
score += 2
if vuln.get("internet_exposed"):
score += 2
if vuln.get("runtime_reachable"):
score += 2
if vuln.get("known_exploited"):
score += 3
if vuln.get("fixed_version"):
score += 1
if score >= 6:
return "fix-now"
if score >= 3:
return "schedule"
return "monitor"
print(priority({
"severity": "High",
"internet_exposed": True,
"runtime_reachable": True,
"known_exploited": False,
"fixed_version": "2.4.1",
}))
defensive
AI policy gate
Авторский безопасный фрагмент для owned/lab-среды. Используется как идея проверки, а не как автономный атакующий инструмент.
ALLOWED_TECHNIQUES = {
"recon.http_headers",
"recon.tls_inventory",
"api.inventory_from_har",
"session.authenticated_replay_readonly",
}
def policy_decision(step: dict) -> tuple[bool, str]:
technique = step.get("technique")
method = step.get("http_method", "GET").upper()
if technique not in ALLOWED_TECHNIQUES:
return False, "technique is not allowlisted"
if method not in {"GET", "HEAD", "OPTIONS"}:
return False, "state-changing replay requires approval gate"
if step.get("target_out_of_scope"):
return False, "target is outside verified scope"
return True, "allowed"
print(policy_decision({"technique": "api.inventory_from_har", "http_method": "GET"}))
web/api
Diff OpenAPI Рё observed traffic РїРѕ path/method
Если у команды есть только OpenAPI, она видит желаемую архитектуру. Если есть только трафик, она не понимает намерение. Полезный аудит строится на diff между схемой, HAR, gateway-логами и правами ролей.
import json
from collections import defaultdict
from pathlib import Path
def openapi_ops(path: str) -> set[tuple[str, str]]:
spec = json.loads(Path(path).read_text(encoding="utf-8"))
result = set()
for route, methods in spec.get("paths", {}).items():
for method in methods:
result.add((method.upper(), route))
return result
def observed_ops(path: str) -> set[tuple[str, str]]:
data = json.loads(Path(path).read_text(encoding="utf-8"))
result = set()
for item in data:
result.add((item["method"].upper(), item["path"]))
return result
doc = openapi_ops("openapi.json")
obs = observed_ops("traffic_inventory.json")
print("observed_only", sorted(obs - doc)[:10])
print("doc_only", sorted(doc - obs)[:10])
identity
Session rotation после WebAuthn step-up
Продукт кажется современным, когда на логине есть passkeys. Но зрелая проверка смотрит дальше: как меняется session id после step-up, какие recovery flows сильнее или слабее WebAuthn и может ли stolen session обойти весь новый красивый UX.
def complete_webauthn_stepup(session: dict, user_id: str, assurance: str) -> dict:
fresh = {
"user_id": user_id,
"session_id": issue_new_session_id(),
"assurance": assurance,
"stepup_at": now_iso(),
"csrf_token": issue_csrf_token(),
}
revoke_session(session["session_id"])
persist_session(fresh)
return fresh
def privileged_action_allowed(session: dict, max_age_seconds: int = 900) -> bool:
if session.get("assurance") not in {"webauthn", "mfa"}:
return False
return age_seconds(session.get("stepup_at")) <= max_age_seconds
visibility
Мини-pack с безопасным query budget
Хороший security visibility слой задаёт не только вопрос 'что можно спросить у хоста', но и 'какой ценой'. В зрелой среде инвентаризация, hardening и compliance-запросы отделены от тяжёлых ad hoc checks.
{
"queries": {
"listening_ports_inventory": {
"query": "select pid, port, protocol, address from listening_ports;",
"interval": 3600,
"description": "Inventory of externally relevant listening ports",
"platform": "linux,darwin,windows",
"snapshot": true
},
"sshd_hardening_check": {
"query": "select key, value from ssh_configs where key in ('PermitRootLogin','PasswordAuthentication');",
"interval": 21600,
"description": "Low-cost SSH hardening verification",
"platform": "linux"
}
}
}
kubernetes
Gatekeeper constraint для запрета privileged containers
Зрелая проверка Kubernetes строится слоями: inventory и posture, policy-as-code на admission, runtime signals и проверяемый hardening diff. Только тогда misconfiguration перестаёт возвращаться в следующем deploy.
apiVersion: constraints.gatekeeper.sh/v1beta1
kind: K8sPSPPrivilegedContainer
metadata:
name: disallow-privileged
spec:
match:
kinds:
- apiGroups: [""]
kinds: ["Pod"]
excludedNamespaces:
- kube-system
parameters:
exemptImages:
- "registry.example/trusted-debug:*"
supply chain
Проверка attestation перед релизом
Наличие SBOM ещё не означает доверие к релизу. Полезная цепочка включает digest, provenance, signed attestations, policy decision и понятный verification step перед deploy.
IMAGE="registry.example/app@sha256:REPLACE_ME"
# verify the image signature
cosign verify "$IMAGE"
# verify attached attestations / provenance
cosign verify-attestation "$IMAGE"
# optional: require annotations or policy claims
cosign verify-attestation -a env=production "$IMAGE"
soc
Простой подсчёт FP rate и stale rules
Detection engineering становится полезной для бизнеса, когда умеет честно измерять шум, скорость исправления и качество handoff. Иначе SOC просто получает всё больше правил и всё меньше доверия к ним.
from datetime import date
def rule_health(rule: dict) -> dict:
alerts = max(rule.get("alerts", 0), 1)
fp_rate = rule.get("false_positives", 0) / alerts
stale = (date.today() - date.fromisoformat(rule["last_review"])).days > 90
return {"rule_id": rule["id"], "fp_rate": round(fp_rate, 2), "stale": stale}
sample = {"id": "edge-404-burst", "alerts": 42, "false_positives": 18, "last_review": "2026-03-20"}
print(rule_health(sample))