Files
opencode-skill/skills/alphaear-predictor/scripts/json_utils.py
Kunthawat Greethong 58f9380ec4 Import 9 alphaear finance skills
- alphaear-deepear-lite: DeepEar Lite API integration
- alphaear-logic-visualizer: Draw.io XML finance diagrams
- alphaear-news: Real-time finance news (10+ sources)
- alphaear-predictor: Kronos time-series forecasting
- alphaear-reporter: Professional financial reports
- alphaear-search: Web search + local RAG
- alphaear-sentiment: FinBERT/LLM sentiment analysis
- alphaear-signal-tracker: Signal evolution tracking
- alphaear-stock: A-Share/HK/US stock data

Updates:
- All scripts updated to use universal .env path
- Added JINA_API_KEY, LLM_*, DEEPSEEK_API_KEY to .env.example
- Updated load_dotenv() to use ~/.config/opencode/.env
2026-03-27 10:11:37 +07:00

181 lines
6.2 KiB
Python

import ast
import json
import re
from typing import Optional, Any
from loguru import logger
def _strip_comments(text: str) -> str:
"""
Safely remove C-style comments (// and /* */) from JSON-like text,
preserving strings (including URLs like http://).
"""
result = []
i = 0
n = len(text)
in_string = False
escape = False
while i < n:
char = text[i]
if in_string:
if char == '\\':
escape = not escape
elif char == '"' and not escape:
in_string = False
else:
escape = False
result.append(char)
i += 1
continue
# Not in string
if char == '"':
in_string = True
result.append(char)
i += 1
continue
# Check for // comment
if i + 1 < n and text[i:i+2] == '//':
i += 2
while i < n and text[i] != '\n':
i += 1
continue
# Check for /* comment
if i + 1 < n and text[i:i+2] == '/*':
i += 2
while i + 1 < n and text[i:i+2] != '*/':
i += 1
i += 2
continue
result.append(char)
i += 1
return ''.join(result)
def extract_json(text: str) -> Optional[Any]:
"""
更加鲁棒的 JSON 提取工具。
处理:
1. Markdown 代码块 (```json ... ```)
2. 首尾多余字符
3. 同一个文本中多个 JSON 对象 (仅提取第一个)
4. 简单的 JSON 修复 (末尾逗号等)
5. C 风格注释 (// 和 /* */)
"""
if not text:
return None
# 1. 清理明显的 Markdown 包装
text = text.strip()
# 先尝试精确匹配 ```json ... ``` 或 ```...```
md_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?```', text, re.DOTALL)
if md_match:
text = md_match.group(1).strip()
elif text.startswith("```"):
# 回退:如果开头有 ``` 但没完整匹配
text = re.sub(r'^```[a-z]*\n?', '', text)
text = re.sub(r'\n?```\s*$', '', text)
# 2. 寻找第一个 JSON 起始符 { 或 [
start_brace = text.find('{')
start_bracket = text.find('[')
if start_brace == -1 and start_bracket == -1:
return None
start_idx = start_brace if (start_bracket == -1 or (start_brace != -1 and start_brace < start_bracket)) else start_bracket
# 2.5 预处理:修复一些极其常见的 LLM 错误
potential_json = text[start_idx:].strip()
# remove comments safely
potential_json = _strip_comments(potential_json)
# b. 修复缺失开头引号的键: nodes": [ -> "nodes": [
# 匹配模式: (空白或换行) 单词 紧跟引号和冒号
potential_json = re.sub(r'([\{\,]\s*)([a-zA-Z_]\w*)\"\s*:', r'\1"\2":', potential_json)
# c. 修复缺失末尾引号的键: "nodes: [ -> "nodes": [
potential_json = re.sub(r'([\{\,]\s*)\"([a-zA-Z_]\w*)\s*:', r'\1"\2":', potential_json)
# d. 修复完全缺失引号的键: nodes: [ -> "nodes": [
# 注意避免匹配到像 http:// 这种内容,所以限定在 { 或 , 之后
potential_json = re.sub(r'([\{\,]\s*)([a-zA-Z_]\w*)\s*:', r'\1"\2":', potential_json)
# 3. 使用 raw_decode 尝试解析
decoder = json.JSONDecoder()
# 首先尝试直接解析(不做任何预处理)
try:
obj = json.loads(potential_json)
return obj
except json.JSONDecodeError:
pass
# 简单预处理:移除对象/列表末位多余逗号
processed_json = re.sub(r',\s*([\]}])', r'\1', potential_json)
try:
obj, end_pos = decoder.raw_decode(processed_json)
return obj
except json.JSONDecodeError:
pass
# e. 修复未终止的字符串字面量问题:移除值中的实际换行符
# LLM 可能在字符串值中生成包含真实 newline 的内容,导致 JSON 非法
def fix_multiline_strings(s):
# 简单策略:将字符串值内的换行替换为空格
lines = s.split('\n')
result = []
in_string = False
for line in lines:
# 计算未转义的引号数
quote_count = line.count('"') - line.count('\\"')
if in_string:
result[-1] += ' ' + line.strip()
else:
result.append(line)
if quote_count % 2 == 1:
in_string = not in_string
return '\n'.join(result)
fixed_json = fix_multiline_strings(processed_json)
try:
obj, end_pos = decoder.raw_decode(fixed_json)
return obj
except json.JSONDecodeError:
try:
# 4. 尝试处理单引号问题 (JSON 规范要求双引号,但 LLM 常输出单引号)
# 这是一个简单的替换技巧,仅针对像 {'key': 'value'} 这样的结构
# 注意:这可能会破坏包含单引号的字符串值,所以作为较后的回退
fix_quotes = re.sub(r"'(.*?)':", r'"\1":', processed_json) # 修复键
fix_quotes = re.sub(r":\s*'(.*?)'", r': "\1"', fix_quotes) # 修复简单值
obj, end_pos = decoder.raw_decode(fix_quotes)
return obj
except (json.JSONDecodeError, TypeError):
try:
# 5. 使用 ast.literal_eval 作为终极回退 (处理 Python 字典格式)
# 提取第一个匹配的括号对内容
# 寻找匹配的 { }
stack = []
for i, char in enumerate(potential_json):
if char == '{': stack.append('{')
elif char == '}':
if stack: stack.pop()
if not stack:
content = potential_json[:i+1]
return ast.literal_eval(content)
except (ValueError, SyntaxError, MemoryError) as e:
logger.warning(f"All JSON extraction attempts failed: {e}")
except Exception as e:
logger.error(f"Unexpected error during JSON extraction: {e}")
return None