Add Claude N8N toolkit with Docker mock API server
- Added comprehensive N8N development tools collection - Added Docker-containerized mock API server for testing - Added complete documentation and setup guides - Added mock API server with health checks and data endpoints - Tools include workflow analyzers, debuggers, and controllers 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
279
claude_n8n/tools/template_error_reproducer.py
Normal file
279
claude_n8n/tools/template_error_reproducer.py
Normal file
@@ -0,0 +1,279 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Template Error Reproducer - Find the exact input causing template errors
|
||||
"""
|
||||
|
||||
import sys
|
||||
sys.path.append('/home/klas/claude_n8n/tools')
|
||||
import json
|
||||
import requests
|
||||
from typing import Dict, Optional
|
||||
|
||||
class TemplateErrorReproducer:
|
||||
"""Reproduce template errors by analyzing the exact configuration"""
|
||||
|
||||
def __init__(self, config_path: str = "n8n_api_credentials.json"):
|
||||
self.config = self._load_config(config_path)
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update(self.config['headers'])
|
||||
self.api_url = self.config['api_url']
|
||||
|
||||
def _load_config(self, config_path: str) -> Dict:
|
||||
with open(config_path, 'r') as f:
|
||||
return json.load(f)
|
||||
|
||||
def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None) -> Dict:
|
||||
url = f"{self.api_url.rstrip('/')}/{endpoint.lstrip('/')}"
|
||||
|
||||
try:
|
||||
if method.upper() == 'GET':
|
||||
response = self.session.get(url, params=data)
|
||||
elif method.upper() == 'PUT':
|
||||
response = self.session.put(url, json=data)
|
||||
else:
|
||||
raise ValueError(f"Unsupported method: {method}")
|
||||
|
||||
response.raise_for_status()
|
||||
return response.json() if response.content else {}
|
||||
|
||||
except Exception as e:
|
||||
print(f"API Error: {e}")
|
||||
if hasattr(e, 'response') and e.response:
|
||||
print(f"Response: {e.response.text[:300]}")
|
||||
raise
|
||||
|
||||
def analyze_information_extractor_config(self, workflow_id: str):
|
||||
"""Analyze the exact Information Extractor configuration"""
|
||||
print("🔍 ANALYZING INFORMATION EXTRACTOR CONFIGURATION")
|
||||
print("=" * 60)
|
||||
|
||||
# Get workflow
|
||||
workflow = self._make_request('GET', f'/workflows/{workflow_id}')
|
||||
|
||||
# Find Information Extractor node
|
||||
info_extractor = None
|
||||
for node in workflow.get('nodes', []):
|
||||
if node.get('name') == 'Information Extractor':
|
||||
info_extractor = node
|
||||
break
|
||||
|
||||
if not info_extractor:
|
||||
print("❌ Information Extractor node not found")
|
||||
return
|
||||
|
||||
print(f"✅ Found Information Extractor node")
|
||||
|
||||
params = info_extractor.get('parameters', {})
|
||||
|
||||
# Analyze each parameter that could cause template issues
|
||||
print(f"\n📋 PARAMETER ANALYSIS:")
|
||||
|
||||
for param_name, param_value in params.items():
|
||||
print(f"\n 📝 {param_name}:")
|
||||
|
||||
if isinstance(param_value, str):
|
||||
print(f" Type: string")
|
||||
print(f" Length: {len(param_value)}")
|
||||
print(f" Preview: {repr(param_value[:100])}...")
|
||||
|
||||
# Check for potential template issues
|
||||
if '{{' in param_value and '}}' in param_value:
|
||||
print(f" ⚠️ Contains N8N expressions")
|
||||
self._analyze_n8n_expression(param_value)
|
||||
|
||||
if '"' in param_value and "'" in param_value:
|
||||
print(f" ⚠️ Contains mixed quotes")
|
||||
|
||||
elif isinstance(param_value, dict):
|
||||
print(f" Type: object with {len(param_value)} properties")
|
||||
for key, value in param_value.items():
|
||||
if isinstance(value, str) and len(value) > 50:
|
||||
print(f" {key}: {type(value)} ({len(value)} chars)")
|
||||
|
||||
# Check for template issues in nested values
|
||||
if '"' in value and "'" in value:
|
||||
print(f" ⚠️ Mixed quotes detected")
|
||||
self._find_quote_issues(value, f"{param_name}.{key}")
|
||||
|
||||
return info_extractor
|
||||
|
||||
def _analyze_n8n_expression(self, expression: str):
|
||||
"""Analyze N8N expressions for potential issues"""
|
||||
print(f" 🔧 N8N Expression Analysis:")
|
||||
|
||||
# Extract the expression content
|
||||
if '{{' in expression and '}}' in expression:
|
||||
start = expression.find('{{') + 2
|
||||
end = expression.find('}}')
|
||||
expr_content = expression[start:end].strip()
|
||||
|
||||
print(f" Expression: {expr_content}")
|
||||
|
||||
# Check for problematic patterns
|
||||
if 'JSON.stringify' in expr_content:
|
||||
print(f" ⚠️ Uses JSON.stringify - can cause quote escaping issues")
|
||||
|
||||
if '$json.chunk' in expr_content:
|
||||
print(f" 🎯 LIKELY ISSUE: JSON.stringify($json.chunk)")
|
||||
print(f" 💡 When chunk contains single quotes, this creates:")
|
||||
print(f" JSON.stringify(\"text with 'quotes'\") → \"\\\"text with 'quotes'\\\"\"")
|
||||
print(f" 💡 The escaped quotes can break LangChain f-string parsing")
|
||||
|
||||
if '.replace(' in expr_content:
|
||||
print(f" ⚠️ Uses string replacement - check for quote handling")
|
||||
|
||||
def _find_quote_issues(self, text: str, context: str):
|
||||
"""Find specific quote-related issues"""
|
||||
print(f" 🔍 Quote Analysis for {context}:")
|
||||
|
||||
# Look for unescaped quotes in JSON-like strings
|
||||
lines = text.split('\n')
|
||||
for i, line in enumerate(lines):
|
||||
if '"' in line and "'" in line:
|
||||
# Check for single quotes inside double-quoted strings
|
||||
if line.count('"') >= 2:
|
||||
# Find content between quotes
|
||||
parts = line.split('"')
|
||||
for j in range(1, len(parts), 2): # Odd indices are inside quotes
|
||||
content = parts[j]
|
||||
if "'" in content and "\\'" not in content:
|
||||
print(f" Line {i+1}: Unescaped single quote in JSON string")
|
||||
print(f" Content: {repr(content)}")
|
||||
print(f" 🚨 THIS COULD CAUSE TEMPLATE ERRORS")
|
||||
|
||||
def create_fixed_configuration(self, workflow_id: str):
|
||||
"""Create a fixed version of the Information Extractor configuration"""
|
||||
print(f"\n🔧 CREATING FIXED CONFIGURATION")
|
||||
print("=" * 40)
|
||||
|
||||
workflow = self._make_request('GET', f'/workflows/{workflow_id}')
|
||||
|
||||
# Find and fix Information Extractor
|
||||
fixed = False
|
||||
for node in workflow.get('nodes', []):
|
||||
if node.get('name') == 'Information Extractor':
|
||||
params = node.get('parameters', {})
|
||||
|
||||
# Fix the text parameter that uses JSON.stringify
|
||||
if 'text' in params:
|
||||
current_text = params['text']
|
||||
print(f"Current text param: {repr(current_text)}")
|
||||
|
||||
if '{{ JSON.stringify($json.chunk) }}' in current_text:
|
||||
# Replace with a safer approach that doesn't use JSON.stringify
|
||||
new_text = '{{ $json.chunk || "" }}'
|
||||
params['text'] = new_text
|
||||
print(f"Fixed text param: {repr(new_text)}")
|
||||
fixed = True
|
||||
|
||||
# Check and fix any other string parameters with quote issues
|
||||
def fix_quotes_in_object(obj, path=""):
|
||||
changed = False
|
||||
if isinstance(obj, dict):
|
||||
for key, value in obj.items():
|
||||
if fix_quotes_in_object(value, f"{path}.{key}" if path else key):
|
||||
changed = True
|
||||
elif isinstance(obj, str):
|
||||
# Fix unescaped single quotes in JSON-like content
|
||||
if '"' in obj and "'" in obj and "\\'" not in obj:
|
||||
# This is a potential issue - try to fix it
|
||||
original = obj
|
||||
# Replace unescaped single quotes with escaped ones
|
||||
fixed_str = obj.replace("'", "\\'")
|
||||
if fixed_str != original:
|
||||
print(f"Fixed quotes in {path}: {repr(original[:50])}... → {repr(fixed_str[:50])}...")
|
||||
return True
|
||||
return changed
|
||||
|
||||
if fix_quotes_in_object(params):
|
||||
print("Applied additional quote fixes")
|
||||
fixed = True
|
||||
|
||||
break
|
||||
|
||||
if fixed:
|
||||
# Apply the fixes
|
||||
update_payload = {
|
||||
'name': workflow['name'],
|
||||
'nodes': workflow['nodes'],
|
||||
'connections': workflow['connections'],
|
||||
'settings': workflow.get('settings', {}),
|
||||
'staticData': workflow.get('staticData', {})
|
||||
}
|
||||
|
||||
try:
|
||||
result = self._make_request('PUT', f'/workflows/{workflow_id}', update_payload)
|
||||
print(f"✅ Applied fixes to workflow")
|
||||
print(f"Updated at: {result.get('updatedAt')}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"❌ Failed to apply fixes: {e}")
|
||||
return False
|
||||
else:
|
||||
print("ℹ️ No fixes needed or no issues found")
|
||||
return False
|
||||
|
||||
def test_specific_inputs(self):
|
||||
"""Test specific inputs that are known to cause template issues"""
|
||||
print(f"\n🧪 TESTING SPECIFIC PROBLEMATIC INPUTS")
|
||||
print("=" * 45)
|
||||
|
||||
# These are inputs that commonly cause template parsing errors
|
||||
problematic_inputs = [
|
||||
"Message with single quote: it's working",
|
||||
'{"json": "with single quote: it\'s here"}',
|
||||
"Template-like: {variable} with quote: that's it",
|
||||
"Czech text: to je náš systém",
|
||||
"Mixed quotes: \"text with 'inner quotes'\"",
|
||||
]
|
||||
|
||||
for input_text in problematic_inputs:
|
||||
print(f"\n🔬 Testing input: {repr(input_text)}")
|
||||
|
||||
# Simulate what happens with JSON.stringify
|
||||
try:
|
||||
import json as py_json
|
||||
json_result = py_json.dumps(input_text)
|
||||
print(f" JSON.stringify result: {repr(json_result)}")
|
||||
|
||||
# Check if this would cause template issues
|
||||
if "\\'" in json_result:
|
||||
print(f" ⚠️ Contains escaped single quotes - potential template issue")
|
||||
elif '"' in json_result and "'" in json_result:
|
||||
print(f" ⚠️ Contains mixed quotes - potential template issue")
|
||||
else:
|
||||
print(f" ✅ Should be safe for templates")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ❌ JSON conversion failed: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
reproducer = TemplateErrorReproducer()
|
||||
|
||||
print("🔧 Template Error Reproducer")
|
||||
print("🎯 This tool will analyze the exact configuration causing template errors")
|
||||
|
||||
try:
|
||||
# Analyze current configuration
|
||||
config = reproducer.analyze_information_extractor_config('w6Sz5trluur5qdMj')
|
||||
|
||||
# Test problematic inputs
|
||||
reproducer.test_specific_inputs()
|
||||
|
||||
# Create fixed configuration
|
||||
print(f"\n" + "="*60)
|
||||
fixed = reproducer.create_fixed_configuration('w6Sz5trluur5qdMj')
|
||||
|
||||
if fixed:
|
||||
print(f"\n✅ FIXES APPLIED!")
|
||||
print(f"🎯 The template error should now be resolved")
|
||||
print(f"💡 Key fix: Replaced JSON.stringify($json.chunk) with safer alternative")
|
||||
else:
|
||||
print(f"\n❓ No obvious template issues found in configuration")
|
||||
print(f"💡 The error might be caused by specific runtime data")
|
||||
|
||||
except Exception as e:
|
||||
print(f"💥 Analysis failed: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
Reference in New Issue
Block a user