Compare commits

...

4 Commits

Author SHA1 Message Date
pablohashescobar
024a870e50 chore: optimize worker startups 2024-12-20 11:41:49 +05:30
pablohashescobar
5ba921edd8 chore: add task queue separations 2024-12-16 17:59:25 +05:30
pablohashescobar
08ba25a8f8 chore: add queue distribution for background workers 2024-12-16 15:53:06 +05:30
pablohashescobar
bc54bed157 chore: add celery backend setup and cleanup settings 2024-12-16 14:22:39 +05:30
28 changed files with 226 additions and 77 deletions

View File

@@ -1,3 +1,3 @@
web: gunicorn -w 4 -k uvicorn.workers.UvicornWorker plane.asgi:application --bind 0.0.0.0:$PORT --max-requests 10000 --max-requests-jitter 1000 --access-logfile -
worker: celery -A plane worker -l info
worker: celery -A plane worker -O fair --prefetch-multiplier=1 --max-tasks-per-child=100 --max-memory-per-child=150000 -l INFO
beat: celery -A plane beat -l INFO

View File

@@ -1,10 +1,18 @@
from django.utils import timezone
# Python imports
from datetime import timedelta
from plane.db.models import APIActivityLog
# Django imports
from django.utils import timezone
from django.conf import settings
# Third party imports
from celery import shared_task
# Module imports
from plane.db.models import APIActivityLog
@shared_task
@shared_task(queue=settings.TASK_SCHEDULER_QUEUE)
def delete_api_logs():
# Get the logs older than 30 days to delete
logs_to_delete = APIActivityLog.objects.filter(

View File

@@ -10,7 +10,7 @@ from django.db.models.fields.related import OneToOneRel
from celery import shared_task
@shared_task
@shared_task(queue=settings.TASK_DEFAULT_QUEUE)
def soft_delete_related_objects(app_label, model_name, instance_pk, using=None):
"""
Soft delete related objects for a given model instance
@@ -111,7 +111,7 @@ def restore_related_objects(app_label, model_name, instance_pk, using=None):
pass
@shared_task
@shared_task(queue=settings.TASK_SCHEDULER_QUEUE)
def hard_delete():
from plane.db.models import (
Workspace,

View File

@@ -5,6 +5,7 @@ from datetime import datetime, timedelta
# Django imports
from django.db.models import Max
from django.conf import settings
# Third party imports
from celery import shared_task
@@ -531,7 +532,7 @@ def create_module_issues(workspace, project, user_id, issue_count):
)
@shared_task
@shared_task(queue=settings.TASK_LOW_QUEUE)
def create_dummy_data(
slug,
email,

View File

@@ -2,16 +2,16 @@ import logging
import re
from datetime import datetime
from bs4 import BeautifulSoup
# Third party imports
from celery import shared_task
# Django imports
from django.core.mail import EmailMultiAlternatives, get_connection
from django.template.loader import render_to_string
# Django imports
from django.utils import timezone
from django.utils.html import strip_tags
from django.conf import settings
# Third party imports
from bs4 import BeautifulSoup
from celery import shared_task
# Module imports
from plane.db.models import EmailNotificationLog, Issue, User
@@ -168,7 +168,7 @@ def process_html_content(content):
return processed_content_list
@shared_task
@shared_task(queue=settings.TASK_NOTIFICATION_QUEUE)
def send_email_notification(
issue_id, notification_data, receiver_id, email_notification_ids
):

View File

@@ -1,6 +1,10 @@
# Python imports
import os
import uuid
# Django imports
from django.conf import settings
# third party imports
from celery import shared_task
from posthog import Posthog
@@ -49,7 +53,7 @@ def auth_events(user, email, user_agent, ip, event_name, medium, first_time):
return
@shared_task
@shared_task(queue=settings.TASK_LOW_QUEUE)
def workspace_invite_event(user, email, user_agent, ip, event_name, accepted_from):
try:
POSTHOG_API_KEY, POSTHOG_HOST = posthogConfiguration()

View File

@@ -7,12 +7,13 @@ import zipfile
import boto3
from botocore.client import Config
# Third party imports
from celery import shared_task
# Django imports
from django.conf import settings
from django.utils import timezone
# Third party imports
from celery import shared_task
from openpyxl import Workbook
# Module imports
@@ -299,7 +300,7 @@ def generate_xlsx(header, project_id, issues, files):
files.append((f"{project_id}.xlsx", xlsx_file))
@shared_task
@shared_task(queue=settings.TASK_LOW_QUEUE)
def issue_export_task(provider, workspace_id, project_ids, token_id, multiple, slug):
try:
exporter_instance = ExporterHistory.objects.get(token=token_id)

View File

@@ -15,7 +15,7 @@ from botocore.client import Config
from plane.db.models import ExporterHistory
@shared_task
@shared_task(queue=settings.TASK_SCHEDULER_QUEUE)
def delete_old_s3_link():
# Get a list of keys and IDs to process
expired_exporter_history = ExporterHistory.objects.filter(

View File

@@ -5,6 +5,7 @@ from datetime import timedelta
# Django imports
from django.utils import timezone
from django.db.models import Q
from django.conf import settings
# Third party imports
from celery import shared_task
@@ -13,7 +14,7 @@ from celery import shared_task
from plane.db.models import FileAsset
@shared_task
@shared_task(queue=settings.TASK_HIGH_QUEUE)
def delete_unuploaded_file_asset():
"""This task deletes unuploaded file assets older than a certain number of days."""
FileAsset.objects.filter(

View File

@@ -1,21 +1,22 @@
# Python imports
import logging
# Third party imports
from celery import shared_task
# Django imports
# Third party imports
from django.core.mail import EmailMultiAlternatives, get_connection
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.conf import settings
# Third party imports
from celery import shared_task
# Module imports
from plane.license.utils.instance_value import get_email_configuration
from plane.utils.exception_logger import log_exception
@shared_task
@shared_task(queue=settings.TASK_HIGH_QUEUE)
def forgot_password(first_name, email, uidb64, token, current_site):
try:
relative_link = (

View File

@@ -2,17 +2,17 @@
import json
# Third Party imports
from celery import shared_task
# Django imports
from django.core.serializers.json import DjangoJSONEncoder
from django.utils import timezone
from django.conf import settings
from plane.app.serializers import IssueActivitySerializer
from plane.bgtasks.notification_task import notifications
# Third Party imports
from celery import shared_task
# Module imports
from plane.app.serializers import IssueActivitySerializer
from plane.bgtasks.notification_task import notifications
from plane.db.models import (
CommentReaction,
Cycle,
@@ -1548,7 +1548,7 @@ def create_intake_activity(
# Receive message from room group
@shared_task
@shared_task(queue=settings.TASK_HIGH_QUEUE)
def issue_activity(
type,
requested_data,

View File

@@ -2,12 +2,13 @@
import json
from datetime import timedelta
# Django imports
from django.db.models import Q
from django.utils import timezone
from django.conf import settings
# Third party imports
from celery import shared_task
from django.db.models import Q
# Django imports
from django.utils import timezone
# Module imports
from plane.bgtasks.issue_activities_task import issue_activity
@@ -15,7 +16,7 @@ from plane.db.models import Issue, Project, State
from plane.utils.exception_logger import log_exception
@shared_task
@shared_task(queue=settings.TASK_SCHEDULER_QUEUE)
def archive_and_close_old_issues():
archive_old_issues()
close_old_issues()

View File

@@ -1,9 +1,16 @@
from celery import shared_task
from django.db import transaction
from django.utils import timezone
# Python imports
from typing import Optional, Dict
import json
# Django imports
from django.db import transaction
from django.utils import timezone
from django.conf import settings
# Third party imports
from celery import shared_task
# Module imports
from plane.db.models import Issue, IssueDescriptionVersion
from plane.utils.exception_logger import log_exception
@@ -39,7 +46,7 @@ def update_existing_version(version: IssueDescriptionVersion, issue) -> None:
)
@shared_task
@shared_task(queue=settings.TASK_HIGH_QUEUE)
def issue_description_version_task(
updated_issue, issue_id, user_id, is_creating=False
) -> Optional[bool]:

View File

@@ -1,21 +1,21 @@
# Python imports
import logging
# Third party imports
from celery import shared_task
# Django imports
# Third party imports
from django.core.mail import EmailMultiAlternatives, get_connection
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.conf import settings
# Third party imports
from celery import shared_task
# Module imports
from plane.license.utils.instance_value import get_email_configuration
from plane.utils.exception_logger import log_exception
@shared_task
@shared_task(queue=settings.TASK_NOTIFICATION_QUEUE)
def magic_link(email, key, token, current_site):
try:
(

View File

@@ -3,6 +3,9 @@ import json
import uuid
from uuid import UUID
# Django imports
from django.conf import settings
# Module imports
from plane.db.models import (
@@ -204,7 +207,7 @@ def create_mention_notification(
)
@shared_task
@shared_task(queue=settings.TASK_NOTIFICATION_QUEUE)
def notifications(
type,
issue_id,

View File

@@ -3,13 +3,14 @@ import json
# Django imports
from django.utils import timezone
from django.conf import settings
# Third-party imports
from bs4 import BeautifulSoup
from celery import shared_task
# Module imports
from plane.db.models import Page, PageLog
from celery import shared_task
from plane.utils.exception_logger import log_exception
@@ -33,7 +34,7 @@ def extract_components(value, tag):
return []
@shared_task
@shared_task(queue=settings.TASK_LOW_QUEUE)
def page_transaction(new_value, old_value, page_id):
try:
page = Page.objects.get(pk=page_id)

View File

@@ -4,12 +4,15 @@ import json
# Third party imports
from celery import shared_task
# Django imports
from django.conf import settings
# Module imports
from plane.db.models import Page, PageVersion
from plane.utils.exception_logger import log_exception
@shared_task
@shared_task(queue=settings.TASK_DEFAULT_QUEUE)
def page_version(page_id, existing_instance, user_id):
try:
# Get the page

View File

@@ -1,13 +1,15 @@
# Python imports
import logging
# Third party imports
from celery import shared_task
# Third party imports
# Django imports
from django.core.mail import EmailMultiAlternatives, get_connection
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.conf import settings
# Third party imports
from celery import shared_task
# Module imports
@@ -17,7 +19,7 @@ from plane.db.models import ProjectMember
from plane.db.models import User
@shared_task
@shared_task(queue=settings.TASK_DEFAULT_QUEUE)
def project_add_user_email(current_site, project_member_id, invitor_id):
try:
# Get the invitor

View File

@@ -1,14 +1,15 @@
# Python imports
import logging
# Third party imports
from celery import shared_task
# Django imports
# Third party imports
from django.core.mail import EmailMultiAlternatives, get_connection
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.conf import settings
# Third party imports
from celery import shared_task
# Module imports
from plane.db.models import Project, ProjectMemberInvite, User
@@ -16,7 +17,7 @@ from plane.license.utils.instance_value import get_email_configuration
from plane.utils.exception_logger import log_exception
@shared_task
@shared_task(queue=settings.TASK_DEFAULT_QUEUE)
def project_invitation(email, project_id, token, current_site, invitor):
try:
user = User.objects.get(email=invitor)

View File

@@ -1,5 +1,6 @@
# Python imports
from django.utils import timezone
from django.conf import settings
# Third party imports
from celery import shared_task
@@ -7,11 +8,18 @@ from celery import shared_task
# Module imports
from plane.db.models import UserRecentVisit, Workspace
from plane.utils.exception_logger import log_exception
from plane.settings.redis import redis_instance
@shared_task
@shared_task(queue=settings.TASK_LOW_QUEUE)
def recent_visited_task(entity_name, entity_identifier, user_id, project_id, slug):
try:
ri = redis_instance()
# Check if the same entity is set in redis for the user
if ri.exists(f"recent_visited:{user_id}:{entity_name}:{entity_identifier}"):
return
# Check if the same entity is set in redis for the workspace
workspace = Workspace.objects.get(slug=slug)
recent_visited = UserRecentVisit.objects.filter(
entity_name=entity_name,
@@ -50,6 +58,10 @@ def recent_visited_task(entity_name, entity_identifier, user_id, project_id, slu
recent_activity.updated_by_id = user_id
recent_activity.save(update_fields=["created_by_id", "updated_by_id"])
# Set in redis
ri.set(
f"recent_visited:{user_id}:{entity_name}:{entity_identifier}", 1, ex=60 * 10
)
return
except Exception as e:
log_exception(e)

View File

@@ -1,3 +1,6 @@
# Django imports
from django.conf import settings
# Third party imports
from celery import shared_task
@@ -7,7 +10,7 @@ from plane.settings.storage import S3Storage
from plane.utils.exception_logger import log_exception
@shared_task
@shared_task(queue=settings.TASK_DEFAULT_QUEUE)
def get_asset_object_metadata(asset_id):
try:
# Get the asset

View File

@@ -5,6 +5,7 @@ import logging
from django.core.mail import EmailMultiAlternatives, get_connection
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.conf import settings
# Third party imports
from celery import shared_task
@@ -15,7 +16,7 @@ from plane.license.utils.instance_value import get_email_configuration
from plane.utils.exception_logger import log_exception
@shared_task
@shared_task(queue=settings.TASK_HIGH_QUEUE)
def user_activation_email(current_site, user_id):
try:
# Send email to user when account is activated

View File

@@ -5,6 +5,7 @@ import logging
from django.core.mail import EmailMultiAlternatives, get_connection
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.conf import settings
# Third party imports
from celery import shared_task
@@ -15,7 +16,7 @@ from plane.license.utils.instance_value import get_email_configuration
from plane.utils.exception_logger import log_exception
@shared_task
@shared_task(queue=settings.TASK_DEFAULT_QUEUE)
def user_deactivation_email(current_site, user_id):
try:
# Send email to user when account is deactivated

View File

@@ -86,6 +86,7 @@ def get_model_data(event, event_id, many=False):
retry_backoff=600,
max_retries=5,
retry_jitter=True,
queue=settings.TASK_NOTIFICATION_QUEUE,
)
def webhook_task(self, webhook, slug, event, event_data, action, current_site):
try:

View File

@@ -8,6 +8,7 @@ from celery import shared_task
from django.core.mail import EmailMultiAlternatives, get_connection
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from django.conf import settings
# Module imports
from plane.db.models import User, Workspace, WorkspaceMemberInvite
@@ -15,7 +16,7 @@ from plane.license.utils.instance_value import get_email_configuration
from plane.utils.exception_logger import log_exception
@shared_task
@shared_task(queue=settings.TASK_DEFAULT_QUEUE)
def workspace_invitation(email, workspace_id, token, current_site, invitor):
try:
user = User.objects.get(email=invitor)

View File

@@ -14,6 +14,39 @@ app = Celery("plane")
# pickle the object when using Windows.
app.config_from_object("django.conf:settings", namespace="CELERY")
# Add additional configurations
CELERY_CONFIGURATIONS = {
"worker_prefetch_multiplier": int(os.environ.get("WORKER_PREFETCH_MULTIPLIER", 1)),
"worker_max_tasks_per_child": int(
os.environ.get("WORKER_MAX_TASKS_PER_CHILD", 100)
),
"worker_max_memory_per_child": int(
os.environ.get("WORKER_MAX_MEMORY_PER_CHILD", 150000)
),
"task_time_limit": int(os.environ.get("TASK_TIME_LIMIT", 3600)), # Hard time limit
"task_soft_time_limit": int(
os.environ.get("TASK_SOFT_TIME_LIMIT", 1800)
), # Soft time limit (30 minutes)
"worker_send_task_events": bool(
os.environ.get("WORKER_SEND_TASK_EVENTS", "0") == "1"
),
"task_ignore_result": bool(
os.environ.get("TASK_IGNORE_RESULT", "1") == "1"
), # Ignore results unless explicitly needed
"task_store_errors_even_if_ignored": bool(
os.environ.get("TASK_STORE_ERRORS_EVEN_IF_IGNORED", "1") == "1"
), # Store errors even if results are ignored
"task_acks_late": bool(
os.environ.get("TASK_ACKS_LATE", "1") == "1"
), # Acknowledge tasks after completion
"task_reject_on_worker_lost": bool(
os.environ.get("TASK_REJECT_ON_WORKER_LOST", "1") == "1"
), # Reject tasks if worker is lost
}
app.conf.update(**CELERY_CONFIGURATIONS)
app.conf.beat_schedule = {
# Executes every day at 12 AM
"check-every-day-to-archive-and-close": {

View File

@@ -1,3 +1,6 @@
# Django imports
from django.conf import settings
# Third party imports
from celery import shared_task
from opentelemetry import trace
@@ -19,7 +22,7 @@ from plane.db.models import (
from plane.utils.telemetry import init_tracer, shutdown_tracer
@shared_task
@shared_task(queue=settings.TASK_LOW_QUEUE)
def instance_traces():
try:
init_tracer()

View File

@@ -3,7 +3,7 @@
# Python imports
import os
from urllib.parse import urlparse
from kombu import Queue, Exchange
# Third party imports
import dj_database_url
@@ -146,8 +146,20 @@ else:
}
}
REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = os.environ.get("REDIS_PORT", "6379")
REDIS_USER = os.environ.get("REDIS_USER", "")
REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD", "")
# Redis Config
REDIS_URL = os.environ.get("REDIS_URL")
if os.environ.get("REDIS_URL"):
REDIS_URL = os.environ.get("REDIS_URL")
else:
if not REDIS_HOST:
raise Exception("REDIS_HOST is not set")
REDIS_URL = f"redis://{REDIS_USER}:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}" # noqa
# Check if redis url is ssl
REDIS_SSL = REDIS_URL and "rediss" in REDIS_URL
if REDIS_SSL:
@@ -233,25 +245,79 @@ if AWS_S3_ENDPOINT_URL and USE_MINIO:
AWS_S3_URL_PROTOCOL = f"{parsed_url.scheme}:"
# RabbitMQ connection settings
RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost")
RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST")
RABBITMQ_PORT = os.environ.get("RABBITMQ_PORT", "5672")
RABBITMQ_USER = os.environ.get("RABBITMQ_USER", "guest")
RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD", "guest")
RABBITMQ_VHOST = os.environ.get("RABBITMQ_VHOST", "/")
AMQP_URL = os.environ.get("AMQP_URL")
if os.environ.get("AMQP_URL"):
AMQP_URL = os.environ.get("AMQP_URL")
else:
if not RABBITMQ_HOST:
raise Exception("RABBITMQ_HOST is not set")
AMQP_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VHOST}"
# Celery Configuration
if AMQP_URL:
CELERY_BROKER_URL = AMQP_URL
else:
CELERY_BROKER_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VHOST}"
CELERY_BROKER_URL = AMQP_URL
CELERY_RESULT_BACKEND = REDIS_URL
CELERY_TIMEZONE = TIME_ZONE
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_RESULT_EXPIRES = int(
os.environ.get("CELERY_RESULT_EXPIRES", 60 * 60 * 24)
) # Expire results after 24 hours
QUEUE_SUFFIX = os.environ.get("QUEUE_SUFFIX", "plane")
TASK_HIGH_QUEUE = f"{QUEUE_SUFFIX}_high"
TASK_DEFAULT_QUEUE = f"{QUEUE_SUFFIX}_default"
TASK_LOW_QUEUE = f"{QUEUE_SUFFIX}_low"
TASK_SCHEDULER_QUEUE = f"{QUEUE_SUFFIX}_schedule"
TASK_NOTIFICATION_QUEUE = f"{QUEUE_SUFFIX}_notification"
# Define queues with their exchanges
CELERY_TASK_QUEUES = (
Queue(
TASK_HIGH_QUEUE,
Exchange("high_exchange", type="direct"),
routing_key="high.#",
queue_arguments={"x-max-priority": 10, "x-queue-mode": "lazy"},
),
Queue(
TASK_DEFAULT_QUEUE,
Exchange("default_exchange", type="direct"),
routing_key="default.#",
queue_arguments={"x-max-priority": 5, "x-queue-mode": "lazy"},
),
Queue(
TASK_LOW_QUEUE,
Exchange("low_exchange", type="direct"),
routing_key="low.#",
queue_arguments={"x-max-priority": 1, "x-queue-mode": "lazy"},
),
Queue(
TASK_SCHEDULER_QUEUE,
Exchange("scheduled_exchange", type="direct"),
routing_key="scheduled.#",
queue_arguments={"x-max-priority": 3, "x-queue-mode": "lazy"},
),
Queue(
TASK_NOTIFICATION_QUEUE,
Exchange("notifications_exchange", type="direct"),
routing_key="notifications.#",
queue_arguments={"x-max-priority": 4, "x-queue-mode": "lazy"},
),
)
# Default queue settings
CELERY_TASK_DEFAULT_QUEUE = "default"
CELERY_TASK_DEFAULT_EXCHANGE = "default_exchange"
CELERY_TASK_DEFAULT_ROUTING_KEY = "default.#"
CELERY_IMPORTS = (
# scheduled tasks
"plane.bgtasks.issue_automation_task",
@@ -297,16 +363,10 @@ GITHUB_ACCESS_TOKEN = os.environ.get("GITHUB_ACCESS_TOKEN", False)
ANALYTICS_SECRET_KEY = os.environ.get("ANALYTICS_SECRET_KEY", False)
ANALYTICS_BASE_API = os.environ.get("ANALYTICS_BASE_API", False)
# Posthog settings
POSTHOG_API_KEY = os.environ.get("POSTHOG_API_KEY", False)
POSTHOG_HOST = os.environ.get("POSTHOG_HOST", False)
# instance key
INSTANCE_KEY = os.environ.get(
"INSTANCE_KEY", "ae6517d563dfc13d8270bd45cf17b08f70b37d989128a9dab46ff687603333c3"
)
# Skip environment variable configuration
SKIP_ENV_VAR = os.environ.get("SKIP_ENV_VAR", "1") == "1"