Files
t6_mem0/embedchain/loaders/gmail.py
2023-12-30 20:52:20 +05:30

143 lines
5.1 KiB
Python

import base64
import hashlib
import logging
import os
from email import message_from_bytes
from email.utils import parsedate_to_datetime
from textwrap import dedent
from typing import Dict, List, Optional
from bs4 import BeautifulSoup
try:
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
except ImportError:
raise ImportError(
'Gmail requires extra dependencies. Install with `pip install --upgrade "embedchain[gmail]"`'
) from None
from embedchain.loaders.base_loader import BaseLoader
from embedchain.utils import clean_string
class GmailReader:
SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
def __init__(self, query: str, service=None, results_per_page: int = 10):
self.query = query
self.service = service or self._initialize_service()
self.results_per_page = results_per_page
@staticmethod
def _initialize_service():
credentials = GmailReader._get_credentials()
return build("gmail", "v1", credentials=credentials)
@staticmethod
def _get_credentials():
if not os.path.exists("credentials.json"):
raise FileNotFoundError("Missing 'credentials.json'. Download it from your Google Developer account.")
creds = (
Credentials.from_authorized_user_file("token.json", GmailReader.SCOPES)
if os.path.exists("token.json")
else None
)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file("credentials.json", GmailReader.SCOPES)
creds = flow.run_local_server(port=8080)
with open("token.json", "w") as token:
token.write(creds.to_json())
return creds
def load_emails(self) -> List[Dict]:
response = self.service.users().messages().list(userId="me", q=self.query).execute()
messages = response.get("messages", [])
return [self._parse_email(self._get_email(message["id"])) for message in messages]
def _get_email(self, message_id: str):
raw_message = self.service.users().messages().get(userId="me", id=message_id, format="raw").execute()
return base64.urlsafe_b64decode(raw_message["raw"])
def _parse_email(self, raw_email) -> Dict:
mime_msg = message_from_bytes(raw_email)
return {
"subject": self._get_header(mime_msg, "Subject"),
"from": self._get_header(mime_msg, "From"),
"to": self._get_header(mime_msg, "To"),
"date": self._format_date(mime_msg),
"body": self._get_body(mime_msg),
}
@staticmethod
def _get_header(mime_msg, header_name: str) -> str:
return mime_msg.get(header_name, "")
@staticmethod
def _format_date(mime_msg) -> Optional[str]:
date_header = GmailReader._get_header(mime_msg, "Date")
return parsedate_to_datetime(date_header).isoformat() if date_header else None
@staticmethod
def _get_body(mime_msg) -> str:
def decode_payload(part):
charset = part.get_content_charset() or "utf-8"
try:
return part.get_payload(decode=True).decode(charset)
except UnicodeDecodeError:
return part.get_payload(decode=True).decode(charset, errors="replace")
if mime_msg.is_multipart():
for part in mime_msg.walk():
ctype = part.get_content_type()
cdispo = str(part.get("Content-Disposition"))
if ctype == "text/plain" and "attachment" not in cdispo:
return decode_payload(part)
elif ctype == "text/html":
return decode_payload(part)
else:
return decode_payload(mime_msg)
return ""
class GmailLoader(BaseLoader):
def load_data(self, query: str):
reader = GmailReader(query=query)
emails = reader.load_emails()
logging.info(f"Gmail Loader: {len(emails)} emails found for query '{query}'")
data = []
for email in emails:
content = self._process_email(email)
data.append({"content": content, "meta_data": email})
return {"doc_id": self._generate_doc_id(query, data), "data": data}
@staticmethod
def _process_email(email: Dict) -> str:
content = BeautifulSoup(email["body"], "html.parser").get_text()
content = clean_string(content)
return dedent(
f"""
Email from '{email['from']}' to '{email['to']}'
Subject: {email['subject']}
Date: {email['date']}
Content: {content}
"""
)
@staticmethod
def _generate_doc_id(query: str, data: List[Dict]) -> str:
content_strings = [email["content"] for email in data]
return hashlib.sha256((query + ", ".join(content_strings)).encode()).hexdigest()