#!/usr/bin/env python3 """ RPC-прокси для Subtensor: слушает на своём порту, пересылает запросы на локальный субтензор, подменяет node ID и внешний IP в ответах (system_localPeerId, system_networkState), чтобы эмулировать ноду с другой идентичностью. """ import asyncio import json import os import re from typing import Any import aiohttp from aiohttp import web # Методы RPC, в ответах которых подменяем идентичность ноды # Substrate: system_localPeerId (camelCase); на всякий случай и snake_case SYSTEM_LOCAL_PEER_ID_ALIASES = ("system_localPeerId", "system_local_peer_id") SYSTEM_NETWORK_STATE_ALIASES = ("system_networkState", "system_network_state") # FlameWire (FAQ) проверяет listen-адреса; возвращаем адреса прокси, а не бэкенда SYSTEM_LOCAL_LISTEN_ADDRESSES_ALIASES = ("system_localListenAddresses", "system_local_listen_addresses") # Имя/версия ноды — подмена чтобы не светить бэкенд (могут входить в отпечаток) SYSTEM_NAME_ALIASES = ("system_name", "system_Name") SYSTEM_VERSION_ALIASES = ("system_version", "system_Version") def get_config() -> dict: # Один адрес ноды (хост:порт) — задаёт и HTTP, и WS бэкенд node_address = os.environ.get("SUBTENSOR_NODE_ADDRESS", "").strip() if node_address: if "://" in node_address: base = node_address ws_base = node_address.replace("http://", "ws://").replace("https://", "wss://") else: base = f"http://{node_address}" ws_base = f"ws://{node_address}" backend_url = base ws_backend_url = ws_base else: backend_url = os.environ.get("SUBTENSOR_RPC_BACKEND", "http://127.0.0.1:9944") ws_backend_url = os.environ.get("SUBTENSOR_WS_BACKEND", "ws://127.0.0.1:9944") return { "backend_url": backend_url, "ws_backend_url": ws_backend_url, "listen_host": os.environ.get("PROXY_LISTEN_HOST", "0.0.0.0"), "listen_port": int(os.environ.get("PROXY_LISTEN_PORT", "9946")), "proxy_node_id": os.environ.get("PROXY_NODE_ID", "12D3KooWProxy00000000000000000000000000000000000000000001"), "proxy_external_ip": os.environ.get("PROXY_EXTERNAL_IP", ""), "proxy_listen_multiaddr": os.environ.get("PROXY_LISTEN_MULTIADDR", ""), "proxy_node_name": os.environ.get("PROXY_NODE_NAME", "subtensor-node-0"), "proxy_node_version": os.environ.get("PROXY_NODE_VERSION", "4.0.0-dev-unknown"), } def override_local_peer_id(result: Any, proxy_node_id: str) -> Any: """Подмена результата system_localPeerId.""" return proxy_node_id def override_network_state(result: Any, config: dict) -> Any: """Подмена в system_networkState: все поля с peer ID и адресами (в т.ч. snake_case).""" if not isinstance(result, dict): return result out = dict(result) node_id = config["proxy_node_id"] # Все варианты ключей peer ID for key in ("peerId", "peer_id", "localPeerId", "local_peer_id"): if key in out: out[key] = node_id out["peerId"] = node_id out["peer_id"] = node_id # Адреса listen_addrs = get_proxy_listen_addresses(config) for key in ("listened_addresses", "listenedAddresses"): out[key] = listen_addrs if config["proxy_external_ip"]: addr = config["proxy_external_ip"].strip() if not re.match(r"^/ip[46]/", addr): addr = f"/ip4/{addr}/tcp/30333" for key in ("external_addresses", "externalAddresses"): out[key] = [addr] return out def get_proxy_listen_addresses(config: dict) -> list: """Адреса для system_localListenAddresses (multiaddr список).""" if config.get("proxy_listen_multiaddr"): return [config["proxy_listen_multiaddr"]] if config.get("proxy_external_ip"): addr = config["proxy_external_ip"].strip() if re.match(r"^/ip[46]/", addr): return [addr] return [f"/ip4/{addr}/tcp/30333"] return ["/ip4/0.0.0.0/tcp/30333"] async def forward_request( session: aiohttp.ClientSession, backend_url: str, body: dict, config: dict, ) -> dict: """Отправляет JSON-RPC запрос на бэкенд и при необходимости подменяет ответ.""" method = body.get("method") if isinstance(body, dict) else None if isinstance(body, list): # Batch: массив запросов async with session.post(backend_url, json=body) as resp: data = await resp.json() if not isinstance(data, list): return data # В batch ответах по позиции совпадают с запросами; по id — с request id requests = body results = data for i, req in enumerate(requests): if i >= len(results): break r = requests[i] res = results[i] if not isinstance(r, dict) or not isinstance(res, dict): continue meth = r.get("method") if "result" in res: if meth in SYSTEM_LOCAL_PEER_ID_ALIASES: res["result"] = config["proxy_node_id"] elif meth in SYSTEM_NETWORK_STATE_ALIASES: res["result"] = override_network_state(res["result"], config) elif meth in SYSTEM_LOCAL_LISTEN_ADDRESSES_ALIASES: res["result"] = get_proxy_listen_addresses(config) elif meth in SYSTEM_NAME_ALIASES: res["result"] = config["proxy_node_name"] elif meth in SYSTEM_VERSION_ALIASES: res["result"] = config["proxy_node_version"] return data async with session.post(backend_url, json=body) as resp: data = await resp.json() if not isinstance(data, dict): return data if method in SYSTEM_LOCAL_PEER_ID_ALIASES and "result" in data: data["result"] = override_local_peer_id(data["result"], config["proxy_node_id"]) if os.environ.get("PROXY_DEBUG"): print(f"[proxy] Подменён {method} -> {config['proxy_node_id']}", flush=True) elif method in SYSTEM_NETWORK_STATE_ALIASES and "result" in data: data["result"] = override_network_state(data["result"], config) if os.environ.get("PROXY_DEBUG"): print(f"[proxy] Подменён system_networkState (peerId, external)", flush=True) elif method in SYSTEM_LOCAL_LISTEN_ADDRESSES_ALIASES and "result" in data: data["result"] = get_proxy_listen_addresses(config) if os.environ.get("PROXY_DEBUG"): print(f"[proxy] Подменён system_localListenAddresses", flush=True) elif method in SYSTEM_NAME_ALIASES and "result" in data: data["result"] = config["proxy_node_name"] elif method in SYSTEM_VERSION_ALIASES and "result" in data: data["result"] = config["proxy_node_version"] return data async def handle_http_rpc(request: web.Request) -> web.Response: config = request.app["config"] session: aiohttp.ClientSession = request.app["http_session"] try: body = await request.json() except Exception: return web.json_response({"jsonrpc": "2.0", "error": {"code": -32700, "message": "Parse error"}, "id": None}, status=400) try: backend = config["backend_url"] result = await forward_request(session, backend, body, config) return web.json_response(result) except aiohttp.ClientError as e: return web.json_response( {"jsonrpc": "2.0", "error": {"code": -32603, "message": f"Backend error: {e}"}, "id": body.get("id") if isinstance(body, dict) else None}, status=502, ) except Exception as e: return web.json_response( {"jsonrpc": "2.0", "error": {"code": -32603, "message": str(e)}, "id": body.get("id") if isinstance(body, dict) else None}, status=500, ) def apply_identity_overrides(data: Any, config: dict) -> Any: """Подменяет node ID / network state в JSON-RPC ответе (одиночный объект или batch).""" if isinstance(data, list): return [apply_identity_overrides(item, config) for item in data] if not isinstance(data, dict): return data out = dict(data) if "result" in out: # Одиночный ответ по id запроса — method не передаётся в ответе, смотрим по контексту или по форме result res = out["result"] if isinstance(res, str) and res and not res.startswith("0x") and len(res) > 40: # Похоже на peer ID (base58) out["result"] = config["proxy_node_id"] elif isinstance(res, dict) and ("peerId" in res or "listened_addresses" in res): out["result"] = override_network_state(res, config) elif isinstance(res, list) and res and isinstance(res[0], str) and (res[0].startswith("/ip4/") or res[0].startswith("/dns4/")): # system_localListenAddresses: список multiaddr out["result"] = get_proxy_listen_addresses(config) if "params" in out and isinstance(out["params"], dict) and "result" in out["params"]: pr = out["params"]["result"] if isinstance(pr, dict) and ("peerId" in pr or "listened_addresses" in pr): out["params"]["result"] = override_network_state(pr, config) return out async def handle_ws_proxy(request: web.Request) -> web.StreamResponse: """WebSocket: пересылаем запросы на бэкенд, в ответах подменяем system_localPeerId/system_networkState.""" ws = web.WebSocketResponse() await ws.prepare(request) config = request.app["config"] backend_ws = config["ws_backend_url"] async with aiohttp.ClientSession() as session: try: async with session.ws_connect(backend_ws) as backend_ws_conn: async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: try: body = json.loads(msg.data) single_method = isinstance(body, dict) and body.get("method") if single_method in (*SYSTEM_LOCAL_PEER_ID_ALIASES, *SYSTEM_NETWORK_STATE_ALIASES, *SYSTEM_LOCAL_LISTEN_ADDRESSES_ALIASES, *SYSTEM_NAME_ALIASES, *SYSTEM_VERSION_ALIASES): async with session.post(config["backend_url"], json=body) as resp: data = await resp.json() if single_method in SYSTEM_LOCAL_PEER_ID_ALIASES and "result" in data: data["result"] = config["proxy_node_id"] elif single_method in SYSTEM_NETWORK_STATE_ALIASES and "result" in data: data["result"] = override_network_state(data["result"], config) elif single_method in SYSTEM_LOCAL_LISTEN_ADDRESSES_ALIASES and "result" in data: data["result"] = get_proxy_listen_addresses(config) elif single_method in SYSTEM_NAME_ALIASES and "result" in data: data["result"] = config["proxy_node_name"] elif single_method in SYSTEM_VERSION_ALIASES and "result" in data: data["result"] = config["proxy_node_version"] await ws.send_str(json.dumps(data)) else: await backend_ws_conn.send_str(msg.data) bmsg = await backend_ws_conn.receive() if bmsg.type == aiohttp.WSMsgType.TEXT and bmsg.data: try: bdata = json.loads(bmsg.data) bdata = apply_identity_overrides(bdata, config) await ws.send_str(json.dumps(bdata)) except (json.JSONDecodeError, TypeError): await ws.send_str(bmsg.data) elif bmsg.data: await ws.send_str(bmsg.data) except json.JSONDecodeError: await ws.send_str(json.dumps({"jsonrpc": "2.0", "error": {"code": -32700, "message": "Parse error"}, "id": None})) elif msg.type in (aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.ERROR): break except Exception as e: await ws.send_str(json.dumps({"jsonrpc": "2.0", "error": {"code": -32603, "message": str(e)}, "id": None})) return ws def create_app(config: dict) -> web.Application: app = web.Application() app["config"] = config app.router.add_post("/", handle_http_rpc) app.router.add_get("/", handle_ws_proxy) return app async def init_session(app: web.Application): app["http_session"] = aiohttp.ClientSession() async def close_session(app: web.Application): await app["http_session"].close() def main(): config = get_config() app = create_app(config) app.on_startup.append(init_session) app.on_cleanup.append(close_session) host = config["listen_host"] port = config["listen_port"] print(f"Subtensor RPC proxy: listen {host}:{port}, backend {config['backend_url']}, node_id={config['proxy_node_id']}, external_ip={config['proxy_external_ip'] or '(not set)'}") web.run_app(app, host=host, port=port) if __name__ == "__main__": main()