Compare commits

...

6 Commits

Author SHA1 Message Date
sriram veeraghanta
c7f48d4a65 fix:adding mq configuration and testing message passing 2024-08-27 16:07:19 +05:30
sriram veeraghanta
4fbb015d79 fix:adding mq configuration and testing message passing 2024-08-27 16:07:02 +05:30
sriram veeraghanta
06fad7494b Merge branch 'preview' of github.com:makeplane/plane into feat-notification-worker 2024-08-26 15:39:38 +05:30
sriram veeraghanta
8fe7765c3c fix: message broker setup on django 2024-08-16 14:21:04 +05:30
sriram veeraghanta
c0aa862642 Merge branch 'preview' of github.com:makeplane/plane into feat-notification-worker 2024-08-16 13:46:36 +05:30
sriram veeraghanta
da91bf3cdc fix: adding new service for notification handling 2024-08-02 19:46:36 +05:30
22 changed files with 160 additions and 11 deletions

View File

@@ -15,19 +15,25 @@ POSTGRES_DB="plane"
POSTGRES_PORT=5432
DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}
# Redis Settings
REDIS_HOST="plane-redis"
REDIS_PORT="6379"
REDIS_URL="redis://${REDIS_HOST}:6379/"
# RabbitMQ Settings
RABBITMQ_HOST="plane-mq"
RABBITMQ_PORT="5672"
RABBITMQ_USER="plane"
RABBITMQ_PASSWORD="plane"
RABBITMQ_VHOST="plane"
# AWS Settings
AWS_REGION=""
AWS_ACCESS_KEY_ID="access-key"
AWS_SECRET_ACCESS_KEY="secret-key"
AWS_S3_ENDPOINT_URL="http://plane-minio:9000"
# Changing this requires change in the nginx.conf for uploads if using minio setup
AWS_S3_BUCKET_NAME="uploads"
AWS_S3_BUCKET_NAME="uploads" # Changing this requires change in the nginx.conf for uploads if using minio setup
# Maximum file upload limit
FILE_SIZE_LIMIT=5242880

View File

@@ -13,6 +13,7 @@ from django.utils import timezone
from plane.app.serializers import IssueActivitySerializer
from plane.bgtasks.notification_task import notifications
from plane.bgtasks.notifier.issue import notify_issue_activity
# Module imports
from plane.db.models import (
@@ -1737,6 +1738,15 @@ def issue_activity(
)
if notification:
notify_issue_activity.delay(
"NOTIFY_ISSUE_ACTIVITY",
json.dumps(
IssueActivitySerializer(
issue_activities_created, many=True
).data,
cls=DjangoJSONEncoder,
),
)
notifications.delay(
type=type,
issue_id=issue_id,

View File

@@ -0,0 +1 @@
from .issue import notify_issue_activity

View File

@@ -0,0 +1,23 @@
import pika
from django.conf import settings
from celery import shared_task
def publish_issue_activity(queue_name, message):
# print(settings.CELERY_BROKER_URL)
connection = pika.BlockingConnection(
pika.ConnectionParameters("127.0.0.1")
)
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_publish(exchange="", routing_key=queue_name, body=message)
connection.close()
@shared_task
def notify_issue_activity(queue_name, message):
publish_issue_activity(queue_name, message)
if __name__ == "__main__":
publish_issue_activity("NOTIFY_ISSUE_ACTIVITY", "Hello World")

View File

@@ -14,6 +14,7 @@ from django.db.models import Q
from plane.utils.html_processor import strip_tags
from .project import ProjectBaseModel
from plane.bgtasks.notifier import notify_issue_activity
def get_default_properties():

View File

@@ -254,19 +254,19 @@ if AWS_S3_ENDPOINT_URL and USE_MINIO:
AWS_S3_CUSTOM_DOMAIN = f"{parsed_url.netloc}/{AWS_STORAGE_BUCKET_NAME}"
AWS_S3_URL_PROTOCOL = f"{parsed_url.scheme}:"
# RabbitMQ connection settings
RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = os.environ.get("RABBITMQ_PORT", "5672")
RABBITMQ_USER = os.environ.get("RABBITMQ_USER", "guest")
RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD", "guest")
RABBITMQ_VHOST = os.environ.get("RABBITMQ_VHOST", "/")
# Celery Configuration
CELERY_BROKER_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VHOST}"
CELERY_TIMEZONE = TIME_ZONE
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_ACCEPT_CONTENT = ["application/json"]
if REDIS_SSL:
redis_url = os.environ.get("REDIS_URL")
broker_url = f"{redis_url}?ssl_cert_reqs={ssl.CERT_NONE.name}&ssl_ca_certs={certifi.where()}"
CELERY_BROKER_URL = broker_url
else:
CELERY_BROKER_URL = REDIS_URL
CELERY_IMPORTS = (
# scheduled tasks
"plane.bgtasks.issue_automation_task",

View File

@@ -17,6 +17,7 @@ django-cors-headers==4.3.1
# celery
celery==5.4.0
django_celery_beat==2.6.0
pika==1.3.2
# file serve
whitenoise==6.6.0
# fake data

6
notifier/.env.example Normal file
View File

@@ -0,0 +1,6 @@
# RabbitMQ Settings
RABBITMQ_HOST="plane-mq"
RABBITMQ_PORT="5672"
RABBITMQ_USER="plane"
RABBITMQ_PASSWORD="plane"
RABBITMQ_VHOST="plane"

View File

0
notifier/README.md Normal file
View File

View File

@@ -0,0 +1 @@
from .home import router as home_router

View File

@@ -0,0 +1,9 @@
from fastapi import APIRouter
router = APIRouter()
@router.get("/")
def home():
return {"message": "Hello, World!"}

19
notifier/engine/app.py Normal file
View File

@@ -0,0 +1,19 @@
import asyncio
from fastapi import FastAPI
from .api import home_router
from .tasks import issue_activity_consumer
from .settings import settings
app = FastAPI(
title=settings.PROJECT_NAME,
description=settings.PROJECT_DESCRIPTION,
version=settings.PROJECT_VERSION,
)
app.include_router(home_router, tags=["home"])
@app.on_event("startup")
async def startup_event():
print("Starting up")
asyncio.create_task(issue_activity_consumer())

View File

@@ -0,0 +1,4 @@
from celery import Celery
from .settings import settings
celery = Celery("tasks", broker=settings.CELERY_BROKER_URL)

View File

@@ -0,0 +1,3 @@
from .common import Settings
settings = Settings()

View File

@@ -0,0 +1,17 @@
import os
class CelerySettings:
# Message queue settings
RABBITMQ_USER = os.environ.get("RABBITMQ_USER", "guest")
RABBITMQ_PASSWORD = os.environ.get("RABBITMQ_PASSWORD", "guest")
RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = os.environ.get("RABBITMQ_PORT", 5672)
RABBITMQ_VHOST = os.environ.get("RABBITMQ_VHOST", "plane")
# Celery settings
CELERY_BROKER_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VHOST}"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "UTC"
CELERY_ENABLE_UTC = True

View File

@@ -0,0 +1,16 @@
import os
# third-party imports
# from pydantic_settings import BaseSettings
# local imports
from .celery import CelerySettings
from .postgres import PostgresSettings
class Settings(CelerySettings, PostgresSettings):
PROJECT_NAME: str = "Notification Engine"
PROJECT_DESCRIPTION: str = "A simple notification service"
PROJECT_VERSION: str = "0.1.0"
SECRET_KEY: str = os.environ.get("SECRET_KEY", "123456789")
DEBUG: bool = os.environ.get("DEBUG", False)

View File

@@ -0,0 +1,9 @@
import os
class PostgresSettings:
PG_HOST: str = os.environ.get("POSTGRES_HOST", "localhost")
PG_PORT: int = os.environ.get("POSTGRES_PORT", 5432)
PG_USER: str = os.environ.get("POSTGRES_USER", "postgres")
PG_PASSWORD: str = os.environ.get("POSTGRES_PASSWORD", "postgres")
PG_NAME: str = os.environ.get("POSTGRES_DB", "notification")

View File

@@ -0,0 +1 @@
from .issue_activity_consumer import issue_activity_consumer

View File

@@ -0,0 +1,13 @@
import aio_pika
from ..settings import settings
async def issue_activity_consumer():
connection = await aio_pika.connect_robust(settings.CELERY_BROKER_URL)
channel = await connection.channel()
queue = await channel.declare_queue("NOTIFY_ISSUE_ACTIVITY", durable=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(f"Received message: {message.body.decode()}")

6
notifier/main.py Normal file
View File

@@ -0,0 +1,6 @@
from engine.app import app
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)

View File

@@ -0,0 +1,3 @@
fastapi==0.111.1
aio-pika==9.4.3
pydantic==2.8.2