.
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
.
from .tasks import send_email_task
def some_view(request):
send_email_task.delay(pk=1789, message_type="simple", sender_id=request.user.id)
return JsonResponse({'status': 'success'})
.
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app_name.settings')
app = Celery('app_name')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
.
from celery import shared_task
from .helpers import send_email
from .models import ClientsModel, EmailNotificationSettings
@shared_task
def send_email_task(pk, message_type, sender_id):
client_task = ClientsModel.objects.get(id=pk)
notification_preferences = EmailNotificationSettings.objects.filter(comments_notifications=True)
send_email(client_task, pk, message_type, notification_preferences, sender_id)