Files
X-Financial/hermes/skills/domain/x-financial-callback/scripts/send_callback.py

71 lines
2.3 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import sys
import time
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Send one generic X-Financial Hermes callback.")
parser.add_argument("--url", required=True)
parser.add_argument("--token", required=True)
parser.add_argument("--retries", type=int, default=3)
parser.add_argument("--type", dest="task_type", default="")
parser.add_argument("--run-id", default="")
parser.add_argument("--status", choices=("running", "succeeded", "failed"), default="")
parser.add_argument("--summary", default="")
parser.add_argument("--error", default="")
return parser.parse_args()
def main() -> int:
args = parse_args()
payload = json.load(sys.stdin)
if args.task_type and args.run_id and args.status:
payload = {
"type": args.task_type,
"run_id": args.run_id,
"status": args.status,
"summary": args.summary,
"payload": payload,
**({"error": args.error} if args.error else {}),
}
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
last_error = ""
for attempt in range(1, max(1, args.retries) + 1):
request = Request(
args.url,
data=body,
headers={
"Authorization": f"Bearer {args.token}",
"Content-Type": "application/json",
},
method="POST",
)
try:
with urlopen(request, timeout=30) as response:
response_body = response.read().decode("utf-8")
if 200 <= response.status < 300:
print(response_body)
return 0
last_error = f"HTTP {response.status}: {response_body}"
except HTTPError as exc:
last_error = f"HTTP {exc.code}: {exc.read().decode('utf-8', errors='replace')}"
except URLError as exc:
last_error = str(exc.reason)
if attempt < max(1, args.retries):
time.sleep(min(attempt, 3))
print(last_error or "callback failed", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())