Real-world plugin patterns
Auto-commit on file write
Agent edits files in your project; you want each edit auto-committed without telling the agent to remember it.
post_tool_call hook# plugin.yaml
name: auto-commit
version: 1.0.0
manifest_version: 1
description: "Auto-commit each file the agent writes."
author: "you@example.com"
kind: standalone
provides_hooks:
- post_tool_call# __init__.py
import subprocess
from pathlib import Path
def commit_on_write(ctx):
"""Fires after every successful tool call. Commits write_file changes."""
if ctx.tool_name != "write_file" or not ctx.success:
return
path_str = ctx.params.get("path", "")
if not path_str or "/projects/" not in path_str:
return # only auto-commit inside ~/projects
path = Path(path_str).expanduser()
repo_dir = path.parent
try:
subprocess.run(["git", "add", str(path)], cwd=repo_dir, check=True)
subprocess.run(
["git", "commit", "-m", f"agent: edit {path.name}"],
cwd=repo_dir, check=True, capture_output=True,
)
except subprocess.CalledProcessError:
pass # not a git repo, nothing to commit, etc.
def register(ctx):
ctx.register_hook("post_tool_call", commit_on_write)Why a hook and not a tool? Because the agent shouldn't have to remember to commit. The hook makes it part of the runtime — invisible.
Block dangerous shell commands
You don't want the agent running rm -rf, sudo, or curl piped to bash without your explicit approval.
pre_tool_call hook with BlockAction# __init__.py
import re
from flowly.agent.hooks import BlockAction
DANGEROUS_PATTERNS = [
re.compile(r"\brm\s+-rf\s+/"),
re.compile(r"\bsudo\b"),
re.compile(r"\bcurl\b.*\|\s*(bash|sh|zsh)\b"),
re.compile(r"\bdd\s+if=.*\bof=/dev/"),
]
def guard_exec(ctx):
if ctx.tool_name != "exec":
return
cmd = ctx.params.get("command", "")
for pattern in DANGEROUS_PATTERNS:
if pattern.search(cmd):
return BlockAction(
message=f"command-guard blocked dangerous pattern: {pattern.pattern}"
)
def register(ctx):
ctx.register_hook("pre_tool_call", guard_exec)BlockAction short-circuits dispatch — the tool's handler is never called. The agent gets back a [blocked: ...] string and can react accordingly.
Focus mode (Pomodoro)
You want a /focus 25 command that disables distracting tools (web_fetch, email_send) for the next 25 minutes.
slash commandpre_tool_call hook# __init__.py
import time
from flowly.agent.hooks import BlockAction
# Module-level state — lives until process restart
_focus_until: float | None = None
_blocked_tools = {"web_fetch", "email_send", "x_post"}
def start_focus(args: str) -> str:
"""/focus <minutes> — start a focus session."""
global _focus_until
try:
minutes = int(args.strip() or "25")
except ValueError:
return "Usage: /focus <minutes>"
_focus_until = time.time() + minutes * 60
return f"🎯 Focus mode active for {minutes} minutes. Distracting tools blocked."
def stop_focus(args: str) -> str:
"""/focus-end — manually end the session."""
global _focus_until
_focus_until = None
return "✅ Focus mode ended."
def block_during_focus(ctx):
if _focus_until is None or time.time() >= _focus_until:
return None
if ctx.tool_name in _blocked_tools:
remaining = int((_focus_until - time.time()) / 60)
return BlockAction(message=f"focus mode active ({remaining}m remaining)")
def register(ctx):
ctx.register_command("focus", start_focus, description="Start focus mode", args_hint="<minutes>")
ctx.register_command("focus-end", stop_focus, description="End focus mode")
ctx.register_hook("pre_tool_call", block_during_focus)Internal company API
Your company has an internal Salesforce-like CRM. You want the agent to look up accounts, log activities, and create tasks.
register_tool ×3check_fn for auth gate# plugin.yaml
name: acme-crm
version: 0.3.0
manifest_version: 1
description: "Acme internal CRM integration."
author: "platform@acme.com"
kind: standalone
provides_tools:
- acme_lookup_account
- acme_log_activity
- acme_create_task
requires_env:
- ACME_CRM_TOKEN
- ACME_CRM_BASE_URL# __init__.py
import os
import httpx
def _auth_ok() -> bool:
return bool(os.getenv("ACME_CRM_TOKEN") and os.getenv("ACME_CRM_BASE_URL"))
def _client() -> httpx.AsyncClient:
return httpx.AsyncClient(
base_url=os.environ["ACME_CRM_BASE_URL"],
headers={"Authorization": f"Bearer {os.environ['ACME_CRM_TOKEN']}"},
timeout=15,
)
async def lookup_account(name: str) -> str:
async with _client() as c:
r = await c.get("/accounts", params={"q": name, "limit": 5})
r.raise_for_status()
data = r.json()
if not data["results"]:
return f"No accounts matching '{name}'."
lines = [f"{a['name']} (id={a['id']}, ARR=${a['arr']:,})" for a in data["results"]]
return "\n".join(lines)
async def log_activity(account_id: str, kind: str, notes: str) -> str:
async with _client() as c:
r = await c.post(
f"/accounts/{account_id}/activities",
json={"kind": kind, "notes": notes},
)
r.raise_for_status()
return f"Logged {kind} on account {account_id}."
async def create_task(account_id: str, title: str, due_date: str) -> str:
async with _client() as c:
r = await c.post(
f"/accounts/{account_id}/tasks",
json={"title": title, "due": due_date},
)
r.raise_for_status()
return f"Task created: {title}"
def register(ctx):
common_check = _auth_ok
ctx.register_tool(
name="acme_lookup_account",
schema={
"parameters": {
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"],
},
},
handler=lookup_account,
check_fn=common_check,
description="Look up Acme CRM accounts by name.",
)
ctx.register_tool(
name="acme_log_activity",
schema={
"parameters": {
"type": "object",
"properties": {
"account_id": {"type": "string"},
"kind": {"type": "string", "enum": ["meeting", "call", "email", "note"]},
"notes": {"type": "string"},
},
"required": ["account_id", "kind", "notes"],
},
},
handler=log_activity,
check_fn=common_check,
description="Log an activity on an Acme account.",
)
ctx.register_tool(
name="acme_create_task",
schema={
"parameters": {
"type": "object",
"properties": {
"account_id": {"type": "string"},
"title": {"type": "string"},
"due_date": {"type": "string", "description": "YYYY-MM-DD"},
},
"required": ["account_id", "title", "due_date"],
},
},
handler=create_task,
check_fn=common_check,
description="Create a follow-up task tied to an account.",
)The check_fn runs at dispatch time. If the env vars aren't set, the tool returns a clear "unavailable" message instead of crashing. Three tools, one check function — clean separation.
Compliance audit logger
Regulated industry — every tool call needs to land in an external SIEM (Splunk, Datadog) for SOC 2 audit trail.
post_tool_call hookslash command for live tail# __init__.py
import json
import os
import threading
from datetime import datetime, timezone
from pathlib import Path
import httpx
_SIEM_ENDPOINT = os.getenv("SIEM_HTTP_INPUT")
_LOCAL_LOG = Path.home() / ".flowly" / "audit" / "tools.jsonl"
_recent: list[dict] = []
_lock = threading.Lock()
def _emit(record: dict) -> None:
"""Write to local JSONL + best-effort POST to SIEM."""
_LOCAL_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(_LOCAL_LOG, "a") as f:
f.write(json.dumps(record) + "\n")
if _SIEM_ENDPOINT:
try:
httpx.post(_SIEM_ENDPOINT, json=record, timeout=2)
except Exception:
pass # never let logging break the agent
def on_tool_done(ctx):
record = {
"ts": datetime.now(timezone.utc).isoformat(),
"session_id": ctx.session_id,
"tool": ctx.tool_name,
"params": ctx.params,
"duration_ms": ctx.duration_ms,
"success": ctx.success,
}
_emit(record)
with _lock:
_recent.append(record)
if len(_recent) > 100:
_recent.pop(0)
def show_recent(args: str) -> str:
n = int(args.strip() or "10")
with _lock:
items = _recent[-n:]
if not items:
return "No tool calls logged yet."
lines = [
f"{r['ts']} {r['tool']:<20} ({r['duration_ms']:.0f}ms) {'✓' if r['success'] else '✗'}"
for r in items
]
return "Recent tool calls:\n" + "\n".join(lines)
def register(ctx):
ctx.register_hook("post_tool_call", on_tool_done)
ctx.register_command(
"audit-tail",
show_recent,
description="Show recent tool calls from audit log",
args_hint="[count]",
)The hook captures every tool call with parameters, duration, and success status. Plus a slash command for on-demand inspection.
Domain context injection (legal practice)
Plugin registers tools but the agent doesn't know when to use them — falls back to memory_search instead. Inject domain context before every LLM call so the model picks the right tool.
pre_llm_call hookregister_tool ×6A common failure mode for plugin-registered tools: the model sees case_search in its tool list but doesn't know it's for legal cases — when the user mentions "Mehmet Demir," the model reaches for memory_search instead. The fix is to register a pre_llm_call hook that prepends a domain orientation to every user message.
# __init__.py — abridged from the bundled case-tracker plugin
import sqlite3
from pathlib import Path
def _connect():
"""SQLite at ~/.flowly/case-tracker/cases.db"""
...
def _inject_legal_context(ctx) -> str | None:
"""Tell the agent it's working with a legal practice that has cases."""
try:
with _connect() as conn:
row = conn.execute(
"SELECT COUNT(*) AS n FROM cases WHERE status = 'aktif'"
).fetchone()
active_count = row["n"] if row else 0
except Exception:
return None
if active_count == 0:
# No cases yet — don't waste tokens on context
return None
return (
f"You are assisting a Turkish family-law attorney. There are "
f"currently {active_count} active legal cases tracked locally.\n\n"
"When the user mentions a client name (Mehmet, Ayşe, etc.) or "
"refers to a 'dava' / 'müvekkil' / 'dosya', use the case_* tools "
"BEFORE memory_search or knowledge_graph:\n"
" • case_search(query) — find cases by client name\n"
" • case_get(id) — full detail + last 10 events\n"
" • case_list(status) — list cases by status\n"
" • case_create(...) — open a new case\n"
" • case_update(...) — change status or notes\n"
" • case_add_event(...) — log a hearing or client call"
)
def register(ctx):
# ... register the case_* tools as usual ...
ctx.register_hook("pre_llm_call", _inject_legal_context)Returned strings are wrapped in <plugin_context> tags and prepended to the last user message — never the system prompt, so the prompt cache prefix stays stable across turns.
memory_search and knowledge_graph, both empty, then giving up. With the hook, the agent now calls case_search → case_get and returns the full case timeline. Same tools, same model, same prompt — just better context.pre_llm_call. Skip it when the database is empty (return None) so dormant plugins don't pay the cost.PII redactor at the gateway boundary
A bot exposed to customers (Telegram support, public Discord) inevitably gets messages containing phone numbers, government IDs, or emails. Redact them at the gateway boundary so the LLM — and the audit log — never see raw PII.
pre_gateway_dispatch hookRewriteActionpre_gateway_dispatch fires once per inbound message from any channel (telegram/web/desktop/iOS/cli) BEFORE session work begins. Returning a RewriteAction replaces the content the agent processes — and the version that lands in audit logs.
# __init__.py
import re
from flowly.agent.hooks import RewriteAction
# Turkish mobile: 05XX-XXX-XX-XX with optional separators
_PHONE = re.compile(r"\b0?5\d{2}[\s\-\.]?\d{3}[\s\-\.]?\d{2}[\s\-\.]?\d{2}\b")
_TC = re.compile(r"\b[1-9]\d{10}\b") # 11-digit TC kimlik
_EMAIL = re.compile(r"\b[\w\.-]+@[\w\.-]+\.\w+\b")
def _redact(text: str) -> str:
text = _PHONE.sub("[REDACTED_PHONE]", text)
text = _TC.sub("[REDACTED_TC]", text)
text = _EMAIL.sub("[REDACTED_EMAIL]", text)
return text
def on_inbound(ctx):
msg = ctx.event # InboundMessage
redacted = _redact(msg.content)
if redacted == msg.content:
return None # nothing matched — pass through
return RewriteAction(text=redacted)
def register(ctx):
ctx.register_hook("pre_gateway_dispatch", on_inbound)RewriteAction replaces msg.content on the live InboundMessage. Every downstream consumer — the LLM, the session store, the audit log — sees only the redacted version. Raw PII never leaves the gateway boundary.SkipAction(reason=...) instead of RewriteAction when you want to drop the message entirely (spam filter, rate limit, off-hours auto-reply handled outside the agent). The agent never sees the message and no LLM tokens are spent.Reference: disk-cleanup
The disk-cleanup plugin ships with Flowly and is the canonical example combining post_tool_call + on_session_end + a slash command. It auto-tracks ephemeral test files the agent creates and cleans them up at the end of every turn — completely invisible to the agent.
flowly/plugins_bundled/disk-cleanup/ in the Flowly repo.