143 lines
5.1 KiB
Python
143 lines
5.1 KiB
Python
import base64
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
from email import message_from_bytes
|
|
from email.utils import parsedate_to_datetime
|
|
from textwrap import dedent
|
|
from typing import Dict, List, Optional
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
try:
|
|
from google.auth.transport.requests import Request
|
|
from google.oauth2.credentials import Credentials
|
|
from google_auth_oauthlib.flow import InstalledAppFlow
|
|
from googleapiclient.discovery import build
|
|
except ImportError:
|
|
raise ImportError(
|
|
'Gmail requires extra dependencies. Install with `pip install --upgrade "embedchain[gmail]"`'
|
|
) from None
|
|
|
|
from embedchain.loaders.base_loader import BaseLoader
|
|
from embedchain.utils import clean_string
|
|
|
|
|
|
class GmailReader:
|
|
SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
|
|
|
|
def __init__(self, query: str, service=None, results_per_page: int = 10):
|
|
self.query = query
|
|
self.service = service or self._initialize_service()
|
|
self.results_per_page = results_per_page
|
|
|
|
@staticmethod
|
|
def _initialize_service():
|
|
credentials = GmailReader._get_credentials()
|
|
return build("gmail", "v1", credentials=credentials)
|
|
|
|
@staticmethod
|
|
def _get_credentials():
|
|
if not os.path.exists("credentials.json"):
|
|
raise FileNotFoundError("Missing 'credentials.json'. Download it from your Google Developer account.")
|
|
|
|
creds = (
|
|
Credentials.from_authorized_user_file("token.json", GmailReader.SCOPES)
|
|
if os.path.exists("token.json")
|
|
else None
|
|
)
|
|
|
|
if not creds or not creds.valid:
|
|
if creds and creds.expired and creds.refresh_token:
|
|
creds.refresh(Request())
|
|
else:
|
|
flow = InstalledAppFlow.from_client_secrets_file("credentials.json", GmailReader.SCOPES)
|
|
creds = flow.run_local_server(port=8080)
|
|
with open("token.json", "w") as token:
|
|
token.write(creds.to_json())
|
|
return creds
|
|
|
|
def load_emails(self) -> List[Dict]:
|
|
response = self.service.users().messages().list(userId="me", q=self.query).execute()
|
|
messages = response.get("messages", [])
|
|
|
|
return [self._parse_email(self._get_email(message["id"])) for message in messages]
|
|
|
|
def _get_email(self, message_id: str):
|
|
raw_message = self.service.users().messages().get(userId="me", id=message_id, format="raw").execute()
|
|
return base64.urlsafe_b64decode(raw_message["raw"])
|
|
|
|
def _parse_email(self, raw_email) -> Dict:
|
|
mime_msg = message_from_bytes(raw_email)
|
|
return {
|
|
"subject": self._get_header(mime_msg, "Subject"),
|
|
"from": self._get_header(mime_msg, "From"),
|
|
"to": self._get_header(mime_msg, "To"),
|
|
"date": self._format_date(mime_msg),
|
|
"body": self._get_body(mime_msg),
|
|
}
|
|
|
|
@staticmethod
|
|
def _get_header(mime_msg, header_name: str) -> str:
|
|
return mime_msg.get(header_name, "")
|
|
|
|
@staticmethod
|
|
def _format_date(mime_msg) -> Optional[str]:
|
|
date_header = GmailReader._get_header(mime_msg, "Date")
|
|
return parsedate_to_datetime(date_header).isoformat() if date_header else None
|
|
|
|
@staticmethod
|
|
def _get_body(mime_msg) -> str:
|
|
def decode_payload(part):
|
|
charset = part.get_content_charset() or "utf-8"
|
|
try:
|
|
return part.get_payload(decode=True).decode(charset)
|
|
except UnicodeDecodeError:
|
|
return part.get_payload(decode=True).decode(charset, errors="replace")
|
|
|
|
if mime_msg.is_multipart():
|
|
for part in mime_msg.walk():
|
|
ctype = part.get_content_type()
|
|
cdispo = str(part.get("Content-Disposition"))
|
|
|
|
if ctype == "text/plain" and "attachment" not in cdispo:
|
|
return decode_payload(part)
|
|
elif ctype == "text/html":
|
|
return decode_payload(part)
|
|
else:
|
|
return decode_payload(mime_msg)
|
|
|
|
return ""
|
|
|
|
|
|
class GmailLoader(BaseLoader):
|
|
def load_data(self, query: str):
|
|
reader = GmailReader(query=query)
|
|
emails = reader.load_emails()
|
|
logging.info(f"Gmail Loader: {len(emails)} emails found for query '{query}'")
|
|
|
|
data = []
|
|
for email in emails:
|
|
content = self._process_email(email)
|
|
data.append({"content": content, "meta_data": email})
|
|
|
|
return {"doc_id": self._generate_doc_id(query, data), "data": data}
|
|
|
|
@staticmethod
|
|
def _process_email(email: Dict) -> str:
|
|
content = BeautifulSoup(email["body"], "html.parser").get_text()
|
|
content = clean_string(content)
|
|
return dedent(
|
|
f"""
|
|
Email from '{email['from']}' to '{email['to']}'
|
|
Subject: {email['subject']}
|
|
Date: {email['date']}
|
|
Content: {content}
|
|
"""
|
|
)
|
|
|
|
@staticmethod
|
|
def _generate_doc_id(query: str, data: List[Dict]) -> str:
|
|
content_strings = [email["content"] for email in data]
|
|
return hashlib.sha256((query + ", ".join(content_strings)).encode()).hexdigest()
|