Files
tkb_timeshift/claude_n8n/tools/n8n_assistant.py
Docker Config Backup 8793ac4f59 Add Claude N8N toolkit with Docker mock API server
- Added comprehensive N8N development tools collection
- Added Docker-containerized mock API server for testing
- Added complete documentation and setup guides
- Added mock API server with health checks and data endpoints
- Tools include workflow analyzers, debuggers, and controllers

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-17 21:23:46 +02:00

430 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
"""
N8N Assistant - Main orchestration script that provides a complete interface
for N8N workflow development, testing, and improvement
"""
import os
import sys
import json
import argparse
from typing import Dict, List, Optional, Any
from datetime import datetime
# Add tools directory to path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from n8n_client import N8NClient
from workflow_analyzer import WorkflowAnalyzer
from execution_monitor import ExecutionMonitor, create_simple_monitor
from workflow_improver import WorkflowImprover, TestCase
class N8NAssistant:
"""Main assistant class that orchestrates all N8N workflow operations"""
def __init__(self, config_path: str = "n8n_api_credentials.json"):
"""Initialize N8N Assistant with all tools"""
print("🚀 Initializing N8N Assistant...")
try:
self.client = N8NClient(config_path)
self.analyzer = WorkflowAnalyzer()
self.monitor = create_simple_monitor(self.client)
self.improver = WorkflowImprover(self.client, self.analyzer, self.monitor)
print("✅ N8N Assistant initialized successfully!")
except Exception as e:
print(f"❌ Failed to initialize N8N Assistant: {e}")
sys.exit(1)
def list_workflows(self) -> List[Dict]:
"""List all workflows with basic information"""
try:
workflows = self.client.list_workflows()
print(f"\n📋 Found {len(workflows)} workflows:")
print("-" * 80)
print(f"{'ID':<20} {'Name':<30} {'Active':<8} {'Created'}")
print("-" * 80)
for workflow in workflows:
workflow_id = workflow.get('id', 'N/A')[:18]
name = workflow.get('name', 'Unnamed')[:28]
active = "Yes" if workflow.get('active') else "No"
created = workflow.get('createdAt', 'N/A')[:10]
print(f"{workflow_id:<20} {name:<30} {active:<8} {created}")
return workflows
except Exception as e:
print(f"❌ Error listing workflows: {e}")
return []
def analyze_workflow(self, workflow_id: str, include_executions: bool = True) -> Dict:
"""Perform comprehensive workflow analysis"""
try:
print(f"🔍 Analyzing workflow {workflow_id}...")
# Get workflow details
workflow = self.client.get_workflow(workflow_id)
print(f"📊 Workflow: {workflow.get('name', 'Unnamed')}")
# Get recent executions if requested
executions = []
if include_executions:
executions = self.client.get_executions(workflow_id, limit=20)
print(f"📈 Analyzing {len(executions)} recent executions")
# Generate comprehensive health report
health_report = self.analyzer.generate_health_report(workflow, executions)
# Display results
self._display_analysis_results(health_report)
return {
'workflow': workflow,
'executions': executions,
'health_report': health_report
}
except Exception as e:
print(f"❌ Error analyzing workflow: {e}")
return {}
def test_workflow(self, workflow_id: str, test_data: Optional[Dict] = None,
create_test_suite: bool = False) -> Dict:
"""Test workflow with provided data or generated test suite"""
try:
print(f"🧪 Testing workflow {workflow_id}...")
if create_test_suite:
# Create comprehensive test suite
workflow = self.client.get_workflow(workflow_id)
test_cases = self.improver.create_test_suite(workflow, [test_data] if test_data else [])
print(f"📝 Created {len(test_cases)} test cases")
test_results = self.improver.run_test_suite(workflow_id, test_cases)
else:
# Single test execution
print("🚀 Executing workflow with test data...")
execution_event = self.monitor.execute_and_monitor(workflow_id, test_data)
test_results = [{
'test_name': 'single_execution',
'status': execution_event.status.value,
'duration': execution_event.duration,
'success': execution_event.status.value == 'success',
'execution_id': execution_event.execution_id,
'error_message': execution_event.error_message
}]
# Display test results
self._display_test_results(test_results)
return {
'test_results': test_results,
'success_rate': len([r for r in test_results if r.get('passed', r.get('success'))]) / len(test_results) * 100
}
except Exception as e:
print(f"❌ Error testing workflow: {e}")
return {'test_results': [], 'success_rate': 0}
def improve_workflow(self, workflow_id: str, max_iterations: int = 3) -> Dict:
"""Perform iterative workflow improvement"""
try:
print(f"🔧 Starting iterative improvement for workflow {workflow_id}...")
print(f"📊 Maximum iterations: {max_iterations}")
# Get workflow and create test suite
workflow = self.client.get_workflow(workflow_id)
test_cases = self.improver.create_test_suite(workflow)
# Perform iterative improvement
improvement_results = self.improver.iterative_improvement(
workflow_id, test_cases, max_iterations
)
# Display improvement results
self._display_improvement_results(improvement_results)
return {
'improvement_results': improvement_results,
'total_iterations': len(improvement_results),
'final_success': improvement_results[-1].success if improvement_results else False
}
except Exception as e:
print(f"❌ Error improving workflow: {e}")
return {'improvement_results': [], 'total_iterations': 0, 'final_success': False}
def monitor_workflow(self, workflow_id: str, duration_minutes: int = 60):
"""Monitor workflow executions for specified duration"""
try:
print(f"👁️ Starting monitoring for workflow {workflow_id}")
print(f"⏱️ Duration: {duration_minutes} minutes")
print("📊 Monitoring started (Ctrl+C to stop)...")
# Start monitoring
self.monitor.start_monitoring([workflow_id])
# Keep monitoring for specified duration
import time
time.sleep(duration_minutes * 60)
# Stop monitoring
self.monitor.stop_monitoring()
# Get execution summary
summary = self.monitor.get_execution_summary(hours=duration_minutes/60)
self._display_execution_summary(summary)
except KeyboardInterrupt:
print("\n⏹️ Monitoring stopped by user")
self.monitor.stop_monitoring()
except Exception as e:
print(f"❌ Error monitoring workflow: {e}")
def get_workflow_health(self, workflow_id: str) -> Dict:
"""Get comprehensive workflow health information"""
try:
print(f"🏥 Getting health information for workflow {workflow_id}...")
# Get workflow health statistics
health_stats = self.client.get_workflow_health(workflow_id)
# Get recent executions for detailed analysis
executions = self.client.get_executions(workflow_id, limit=10)
# Analyze error patterns if there are failures
error_patterns = []
if health_stats['error_count'] > 0:
error_patterns = self.analyzer.find_error_patterns(executions)
print(f"📊 Health Statistics:")
print(f" Total Executions (7 days): {health_stats['total_executions']}")
print(f" Success Rate: {health_stats['success_rate']:.1f}%")
print(f" Error Count: {health_stats['error_count']}")
if error_patterns:
print(f"\n🚨 Error Patterns Found:")
for pattern in error_patterns[:3]: # Show top 3 patterns
print(f"{pattern['pattern']}: {pattern['frequency']} occurrences")
return {
'health_stats': health_stats,
'error_patterns': error_patterns,
'recent_executions': executions
}
except Exception as e:
print(f"❌ Error getting workflow health: {e}")
return {}
def debug_execution(self, execution_id: str) -> Dict:
"""Debug a specific workflow execution"""
try:
print(f"🔍 Debugging execution {execution_id}...")
# Get execution details
execution = self.client.get_execution(execution_id)
# Analyze execution logs
analysis = self.analyzer.analyze_execution_logs(execution)
# Get detailed logs
logs = self.monitor.get_execution_logs(execution_id)
# Display debug information
print(f"🚀 Execution Status: {analysis['status']}")
print(f"⏱️ Duration: {analysis['total_duration']:.2f}s")
if analysis['errors']:
print(f"\n❌ Errors Found:")
for error in analysis['errors']:
print(f"{error.get('message', 'Unknown error')}")
if analysis['performance_issues']:
print(f"\n⚠️ Performance Issues:")
for issue in analysis['performance_issues']:
print(f"{issue.get('description', 'Unknown issue')}")
return {
'execution': execution,
'analysis': analysis,
'logs': logs
}
except Exception as e:
print(f"❌ Error debugging execution: {e}")
return {}
def _display_analysis_results(self, health_report):
"""Display workflow analysis results"""
print(f"\n📊 Analysis Results:")
print(f" Health Score: {health_report.health_score:.1f}/100")
print(f" Issues Found: {len(health_report.issues)}")
print(f" Suggestions: {len(health_report.suggestions)}")
if health_report.issues:
print(f"\n🚨 Issues Found:")
for issue in health_report.issues[:5]: # Show top 5 issues
severity = issue.get('severity', 'unknown').upper()
description = issue.get('description', 'No description')
print(f" [{severity}] {description}")
if health_report.suggestions:
print(f"\n💡 Suggestions:")
for suggestion in health_report.suggestions[:5]: # Show top 5 suggestions
print(f"{suggestion}")
if health_report.error_patterns:
print(f"\n🔍 Error Patterns:")
for pattern in health_report.error_patterns[:3]: # Show top 3 patterns
print(f"{pattern['pattern']}: {pattern['frequency']} occurrences")
def _display_test_results(self, test_results):
"""Display test execution results"""
passed = len([r for r in test_results if r.get('passed', r.get('success'))])
total = len(test_results)
print(f"\n🧪 Test Results: {passed}/{total} passed ({passed/total*100:.1f}%)")
for result in test_results:
test_name = result.get('test_name', 'Unknown')
status = "✅ PASS" if result.get('passed', result.get('success')) else "❌ FAIL"
duration = result.get('duration') or result.get('execution_time')
if duration:
print(f" {status} {test_name} ({duration:.2f}s)")
else:
print(f" {status} {test_name}")
if not result.get('passed', result.get('success')) and result.get('error_message'):
print(f" Error: {result['error_message']}")
def _display_improvement_results(self, improvement_results):
"""Display workflow improvement results"""
if not improvement_results:
print("🔧 No improvements were made")
return
print(f"\n🔧 Improvement Results ({len(improvement_results)} iterations):")
for result in improvement_results:
status = "✅ SUCCESS" if result.success else "❌ FAILED"
print(f" Iteration {result.iteration}: {status}")
if result.improvements_made:
for improvement in result.improvements_made:
print(f"{improvement}")
if result.performance_metrics:
metrics = result.performance_metrics
if metrics.get('success_rate_improvement', 0) > 0:
print(f" 📈 Success rate improved by {metrics['success_rate_improvement']*100:.1f}%")
def _display_execution_summary(self, summary):
"""Display execution monitoring summary"""
print(f"\n📊 Execution Summary ({summary['time_period_hours']} hours):")
print(f" Total Executions: {summary['total_executions']}")
print(f" Success Rate: {summary['success_rate']:.1f}%")
print(f" Average Duration: {summary['average_duration_seconds']:.2f}s")
if summary['workflow_statistics']:
print(f"\n📈 Workflow Statistics:")
for workflow_id, stats in summary['workflow_statistics'].items():
success_rate = (stats['success'] / stats['total'] * 100) if stats['total'] > 0 else 0
print(f" {workflow_id[:8]}...: {stats['total']} executions, {success_rate:.1f}% success")
def main():
"""Main CLI interface"""
parser = argparse.ArgumentParser(description="N8N Workflow Assistant")
parser.add_argument("--config", default="n8n_api_credentials.json",
help="Path to N8N API configuration file")
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# List workflows command
subparsers.add_parser("list", help="List all workflows")
# Analyze workflow command
analyze_parser = subparsers.add_parser("analyze", help="Analyze workflow")
analyze_parser.add_argument("workflow_id", help="Workflow ID to analyze")
analyze_parser.add_argument("--no-executions", action="store_true",
help="Skip execution analysis")
# Test workflow command
test_parser = subparsers.add_parser("test", help="Test workflow")
test_parser.add_argument("workflow_id", help="Workflow ID to test")
test_parser.add_argument("--data", help="JSON test data file")
test_parser.add_argument("--suite", action="store_true",
help="Create and run comprehensive test suite")
# Improve workflow command
improve_parser = subparsers.add_parser("improve", help="Improve workflow")
improve_parser.add_argument("workflow_id", help="Workflow ID to improve")
improve_parser.add_argument("--iterations", type=int, default=3,
help="Maximum improvement iterations")
# Monitor workflow command
monitor_parser = subparsers.add_parser("monitor", help="Monitor workflow")
monitor_parser.add_argument("workflow_id", help="Workflow ID to monitor")
monitor_parser.add_argument("--duration", type=int, default=60,
help="Monitoring duration in minutes")
# Health check command
health_parser = subparsers.add_parser("health", help="Check workflow health")
health_parser.add_argument("workflow_id", help="Workflow ID to check")
# Debug execution command
debug_parser = subparsers.add_parser("debug", help="Debug execution")
debug_parser.add_argument("execution_id", help="Execution ID to debug")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
# Initialize assistant
assistant = N8NAssistant(args.config)
# Execute command
try:
if args.command == "list":
assistant.list_workflows()
elif args.command == "analyze":
assistant.analyze_workflow(args.workflow_id, not args.no_executions)
elif args.command == "test":
test_data = None
if args.data:
with open(args.data, 'r') as f:
test_data = json.load(f)
assistant.test_workflow(args.workflow_id, test_data, args.suite)
elif args.command == "improve":
assistant.improve_workflow(args.workflow_id, args.iterations)
elif args.command == "monitor":
assistant.monitor_workflow(args.workflow_id, args.duration)
elif args.command == "health":
assistant.get_workflow_health(args.workflow_id)
elif args.command == "debug":
assistant.debug_execution(args.execution_id)
except KeyboardInterrupt:
print("\n👋 Operation cancelled by user")
except Exception as e:
print(f"❌ Error executing command: {e}")
if __name__ == "__main__":
main()