[Feature]: Add posthog anonymous telemetry and update docs (#867)
This commit is contained in:
@@ -15,7 +15,7 @@ class AppConfig(BaseAppConfig):
|
||||
self,
|
||||
log_level: str = "WARNING",
|
||||
id: Optional[str] = None,
|
||||
collect_metrics: Optional[bool] = None,
|
||||
collect_metrics: Optional[bool] = True,
|
||||
collection_name: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
|
||||
@@ -16,7 +16,7 @@ class PipelineConfig(BaseAppConfig):
|
||||
log_level: str = "WARNING",
|
||||
id: Optional[str] = None,
|
||||
name: Optional[str] = None,
|
||||
collect_metrics: Optional[bool] = False,
|
||||
collect_metrics: Optional[bool] = True,
|
||||
):
|
||||
"""
|
||||
Initializes a configuration class instance for an App. This is the simplest form of an embedchain app.
|
||||
|
||||
@@ -1,18 +1,13 @@
|
||||
import hashlib
|
||||
import importlib.metadata
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
import threading
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import requests
|
||||
from dotenv import load_dotenv
|
||||
from langchain.docstore.document import Document
|
||||
from tenacity import retry, stop_after_attempt, wait_fixed
|
||||
|
||||
from embedchain.chunkers.base_chunker import BaseChunker
|
||||
from embedchain.config import AddConfig, BaseLlmConfig
|
||||
@@ -24,6 +19,7 @@ from embedchain.llm.base import BaseLlm
|
||||
from embedchain.loaders.base_loader import BaseLoader
|
||||
from embedchain.models.data_type import (DataType, DirectDataType,
|
||||
IndirectDataType, SpecialDataType)
|
||||
from embedchain.telemetry.posthog import AnonymousTelemetry
|
||||
from embedchain.utils import detect_datatype
|
||||
from embedchain.vectordb.base import BaseVectorDB
|
||||
|
||||
@@ -89,9 +85,8 @@ class EmbedChain(JSONSerializable):
|
||||
self.user_asks = []
|
||||
|
||||
# Send anonymous telemetry
|
||||
self.s_id = self.config.id if self.config.id else str(uuid.uuid4())
|
||||
self.u_id = self._load_or_generate_user_id()
|
||||
|
||||
self._telemetry_props = {"class": self.__class__.__name__}
|
||||
self.telemetry = AnonymousTelemetry(enabled=self.config.collect_metrics)
|
||||
# Establish a connection to the SQLite database
|
||||
self.connection = sqlite3.connect(SQLITE_PATH)
|
||||
self.cursor = self.connection.cursor()
|
||||
@@ -111,12 +106,8 @@ class EmbedChain(JSONSerializable):
|
||||
"""
|
||||
)
|
||||
self.connection.commit()
|
||||
|
||||
# NOTE: Uncomment the next two lines when running tests to see if any test fires a telemetry event.
|
||||
# if (self.config.collect_metrics):
|
||||
# raise ConnectionRefusedError("Collection of metrics should not be allowed.")
|
||||
thread_telemetry = threading.Thread(target=self._send_telemetry_event, args=("init",))
|
||||
thread_telemetry.start()
|
||||
# Send anonymous telemetry
|
||||
self.telemetry.capture(event_name="init", properties=self._telemetry_props)
|
||||
|
||||
@property
|
||||
def collect_metrics(self):
|
||||
@@ -138,29 +129,6 @@ class EmbedChain(JSONSerializable):
|
||||
raise ValueError(f"Boolean value expected but got {type(value)}.")
|
||||
self.llm.online = value
|
||||
|
||||
def _load_or_generate_user_id(self) -> str:
|
||||
"""
|
||||
Loads the user id from the config file if it exists, otherwise generates a new
|
||||
one and saves it to the config file.
|
||||
|
||||
:return: user id
|
||||
:rtype: str
|
||||
"""
|
||||
if not os.path.exists(CONFIG_DIR):
|
||||
os.makedirs(CONFIG_DIR)
|
||||
|
||||
if os.path.exists(CONFIG_FILE):
|
||||
with open(CONFIG_FILE, "r") as f:
|
||||
data = json.load(f)
|
||||
if "user_id" in data:
|
||||
return data["user_id"]
|
||||
|
||||
u_id = str(uuid.uuid4())
|
||||
with open(CONFIG_FILE, "w") as f:
|
||||
json.dump({"user_id": u_id}, f)
|
||||
|
||||
return u_id
|
||||
|
||||
def add(
|
||||
self,
|
||||
source: Any,
|
||||
@@ -259,9 +227,14 @@ class EmbedChain(JSONSerializable):
|
||||
# it's quicker to check the variable twice than to count words when they won't be submitted.
|
||||
word_count = data_formatter.chunker.get_word_count(documents)
|
||||
|
||||
extra_metadata = {"data_type": data_type.value, "word_count": word_count, "chunks_count": new_chunks}
|
||||
thread_telemetry = threading.Thread(target=self._send_telemetry_event, args=("add", extra_metadata))
|
||||
thread_telemetry.start()
|
||||
# Send anonymous telemetry
|
||||
event_properties = {
|
||||
**self._telemetry_props,
|
||||
"data_type": data_type.value,
|
||||
"word_count": word_count,
|
||||
"chunks_count": new_chunks,
|
||||
}
|
||||
self.telemetry.capture(event_name="add", properties=event_properties)
|
||||
|
||||
return source_hash
|
||||
|
||||
@@ -535,9 +508,7 @@ class EmbedChain(JSONSerializable):
|
||||
answer = self.llm.query(input_query=input_query, contexts=contexts, config=config, dry_run=dry_run)
|
||||
|
||||
# Send anonymous telemetry
|
||||
thread_telemetry = threading.Thread(target=self._send_telemetry_event, args=("query",))
|
||||
thread_telemetry.start()
|
||||
|
||||
self.telemetry.capture(event_name="query", properties=self._telemetry_props)
|
||||
return answer
|
||||
|
||||
def chat(
|
||||
@@ -569,10 +540,8 @@ class EmbedChain(JSONSerializable):
|
||||
"""
|
||||
contexts = self.retrieve_from_database(input_query=input_query, config=config, where=where)
|
||||
answer = self.llm.chat(input_query=input_query, contexts=contexts, config=config, dry_run=dry_run)
|
||||
|
||||
# Send anonymous telemetry
|
||||
thread_telemetry = threading.Thread(target=self._send_telemetry_event, args=("chat",))
|
||||
thread_telemetry.start()
|
||||
self.telemetry.capture(event_name="chat", properties=self._telemetry_props)
|
||||
|
||||
return answer
|
||||
|
||||
@@ -608,34 +577,8 @@ class EmbedChain(JSONSerializable):
|
||||
Resets the database. Deletes all embeddings irreversibly.
|
||||
`App` does not have to be reinitialized after using this method.
|
||||
"""
|
||||
# Send anonymous telemetry
|
||||
thread_telemetry = threading.Thread(target=self._send_telemetry_event, args=("reset",))
|
||||
thread_telemetry.start()
|
||||
|
||||
self.db.reset()
|
||||
self.cursor.execute("DELETE FROM data_sources WHERE pipeline_id = ?", (self.config.id,))
|
||||
self.connection.commit()
|
||||
|
||||
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
|
||||
def _send_telemetry_event(self, method: str, extra_metadata: Optional[dict] = None):
|
||||
"""
|
||||
Send telemetry event to the embedchain server. This is anonymous. It can be toggled off in `AppConfig`.
|
||||
"""
|
||||
if not self.config.collect_metrics:
|
||||
return
|
||||
|
||||
with threading.Lock():
|
||||
url = "https://api.embedchain.ai/api/v1/telemetry/"
|
||||
metadata = {
|
||||
"s_id": self.s_id,
|
||||
"version": importlib.metadata.version(__package__ or __name__),
|
||||
"method": method,
|
||||
"language": "py",
|
||||
"u_id": self.u_id,
|
||||
}
|
||||
if extra_metadata:
|
||||
metadata.update(extra_metadata)
|
||||
|
||||
response = requests.post(url, json={"metadata": metadata})
|
||||
if response.status_code != 200:
|
||||
logging.warning(f"Telemetry event failed with status code {response.status_code}")
|
||||
# Send anonymous telemetry
|
||||
self.telemetry.capture(event_name="reset", properties=self._telemetry_props)
|
||||
|
||||
@@ -18,6 +18,7 @@ from embedchain.factory import EmbedderFactory, LlmFactory, VectorDBFactory
|
||||
from embedchain.helper.json_serializable import register_deserializable
|
||||
from embedchain.llm.base import BaseLlm
|
||||
from embedchain.llm.openai import OpenAILlm
|
||||
from embedchain.telemetry.posthog import AnonymousTelemetry
|
||||
from embedchain.vectordb.base import BaseVectorDB
|
||||
from embedchain.vectordb.chroma import ChromaDB
|
||||
|
||||
@@ -109,8 +110,9 @@ class Pipeline(EmbedChain):
|
||||
self.llm = llm or OpenAILlm()
|
||||
self._init_db()
|
||||
|
||||
# setup user id and directory
|
||||
self.u_id = self._load_or_generate_user_id()
|
||||
# Send anonymous telemetry
|
||||
self._telemetry_props = {"class": self.__class__.__name__}
|
||||
self.telemetry = AnonymousTelemetry(enabled=self.config.collect_metrics)
|
||||
|
||||
# Establish a connection to the SQLite database
|
||||
self.connection = sqlite3.connect(SQLITE_PATH)
|
||||
@@ -131,8 +133,10 @@ class Pipeline(EmbedChain):
|
||||
"""
|
||||
)
|
||||
self.connection.commit()
|
||||
# Send anonymous telemetry
|
||||
self.telemetry.capture(event_name="init", properties=self._telemetry_props)
|
||||
|
||||
self.user_asks = [] # legacy defaults
|
||||
self.user_asks = []
|
||||
if self.auto_deploy:
|
||||
self.deploy()
|
||||
|
||||
@@ -219,6 +223,9 @@ class Pipeline(EmbedChain):
|
||||
"""
|
||||
Search for similar documents related to the query in the vector database.
|
||||
"""
|
||||
# Send anonymous telemetry
|
||||
self.telemetry.capture(event_name="search", properties=self._telemetry_props)
|
||||
|
||||
# TODO: Search will call the endpoint rather than fetching the data from the db itself when deploy=True.
|
||||
if self.id is None:
|
||||
where = {"app_id": self.local_id}
|
||||
@@ -312,6 +319,9 @@ class Pipeline(EmbedChain):
|
||||
data_hash, data_type, data_value = result[1], result[2], result[3]
|
||||
self._process_and_upload_data(data_hash, data_type, data_value)
|
||||
|
||||
# Send anonymous telemetry
|
||||
self.telemetry.capture(event_name="deploy", properties=self._telemetry_props)
|
||||
|
||||
@classmethod
|
||||
def from_config(cls, yaml_path: str, auto_deploy: bool = False):
|
||||
"""
|
||||
@@ -347,6 +357,11 @@ class Pipeline(EmbedChain):
|
||||
embedding_model = EmbedderFactory.create(
|
||||
embedding_model_provider, embedding_model_config_data.get("config", {})
|
||||
)
|
||||
|
||||
# Send anonymous telemetry
|
||||
event_properties = {"init_type": "yaml_config"}
|
||||
AnonymousTelemetry().capture(event_name="init", properties=event_properties)
|
||||
|
||||
return cls(
|
||||
config=pipeline_config,
|
||||
llm=llm,
|
||||
|
||||
0
embedchain/telemetry/__init__.py
Normal file
0
embedchain/telemetry/__init__.py
Normal file
67
embedchain/telemetry/posthog.py
Normal file
67
embedchain/telemetry/posthog.py
Normal file
@@ -0,0 +1,67 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
from posthog import Posthog
|
||||
|
||||
import embedchain
|
||||
|
||||
HOME_DIR = str(Path.home())
|
||||
CONFIG_DIR = os.path.join(HOME_DIR, ".embedchain")
|
||||
CONFIG_FILE = os.path.join(CONFIG_DIR, "config.json")
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AnonymousTelemetry:
|
||||
def __init__(self, host="https://app.posthog.com", enabled=True):
|
||||
self.project_api_key = "phc_XnMmNHzwxE7PVHX4mD2r8K6nfxVM48a2sq2U3N1p2lO"
|
||||
self.host = host
|
||||
self.posthog = Posthog(project_api_key=self.project_api_key, host=self.host)
|
||||
self.user_id = self.get_user_id()
|
||||
self.enabled = enabled
|
||||
|
||||
# Check if telemetry tracking is disabled via environment variable
|
||||
if "EC_TELEMETRY" in os.environ and os.environ["EC_TELEMETRY"].lower() not in [
|
||||
"1",
|
||||
"true",
|
||||
"yes",
|
||||
]:
|
||||
self.enabled = False
|
||||
|
||||
if not self.enabled:
|
||||
self.posthog.disabled = True
|
||||
|
||||
# Silence posthog logging
|
||||
posthog_logger = logging.getLogger("posthog")
|
||||
posthog_logger.disabled = True
|
||||
|
||||
def get_user_id(self):
|
||||
if not os.path.exists(CONFIG_DIR):
|
||||
os.makedirs(CONFIG_DIR)
|
||||
|
||||
if os.path.exists(CONFIG_FILE):
|
||||
with open(CONFIG_FILE, "r") as f:
|
||||
data = json.load(f)
|
||||
if "user_id" in data:
|
||||
return data["user_id"]
|
||||
|
||||
user_id = str(uuid.uuid4())
|
||||
with open(CONFIG_FILE, "w") as f:
|
||||
json.dump({"user_id": user_id}, f)
|
||||
return user_id
|
||||
|
||||
def capture(self, event_name, properties=None):
|
||||
default_properties = {
|
||||
"version": embedchain.__version__,
|
||||
"language": "python",
|
||||
"pid": os.getpid(),
|
||||
}
|
||||
properties.update(default_properties)
|
||||
|
||||
try:
|
||||
self.posthog.capture(self.user_id, event_name, properties)
|
||||
except Exception:
|
||||
logger.exception(f"Failed to send telemetry {event_name=}")
|
||||
@@ -38,7 +38,7 @@ class ChromaDB(BaseVectorDB):
|
||||
else:
|
||||
self.config = ChromaDbConfig()
|
||||
|
||||
self.settings = Settings()
|
||||
self.settings = Settings(anonymized_telemetry=False)
|
||||
self.settings.allow_reset = self.config.allow_reset if hasattr(self.config, "allow_reset") else False
|
||||
if self.config.chroma_settings:
|
||||
for key, value in self.config.chroma_settings.items():
|
||||
|
||||
Reference in New Issue
Block a user