Case study: строим SaaS от идеи до production с подписками и командной работой
Case study: строим SaaS от идеи до production.
💡 Правило: Начинайте просто, масштабируйте по мере роста.
SaaS (Software as a Service) — модель распространения программного обеспечения, где приложение размещается на серверах провайдера и предоставляется пользователям через интернет по подписке.
Примеры SaaS:
В этой главе мы создадим ProjectHub — SaaS-платформу для управления проектами с:

| Компонент | Назначение |
|---|---|
| Web App | Django + DRF, обрабатывает HTTP-запросы |
| Worker | Celery worker, выполняет фоновые задачи |
| Beat | Celery beat, планирует периодические задачи |
| PostgreSQL | Основное хранилище данных |
| Redis | Кэш, брокер сообщений для Celery, сессии |
| Load Balancer | Распределение нагрузки, SSL termination |
Multi-tenancy — архитектурный паттерн, при котором одно приложение обслуживает несколько организаций (тенантов), изолируя их данные друг от друга.
| Подход | Плюсы | Минусы | Когда использовать |
|---|---|---|---|
| Database per tenant | Полная изоляция, бэкапы | Дорого, сложно масштабировать | Enterprise, compliance |
| Schema per tenant | Изоляция, легче бэкапить | Сложные миграции | Средний бизнес |
| Row-level isolation | Дёшево, просто | Риск утечки данных | Startups, SMB ← наш выбор |
Каждая таблица имеет поле organization_id. Все запросы фильтруются по организации.
# models.py
from django.db import models
from django.conf import settings
class Organization(models.Model):
"""
Организация (тенант) — основная единица изоляции.
Каждый пользователь принадлежит одной или нескольким организациям.
"""
name = models.CharField(max_length=200)
domain = models.CharField(max_length=100, unique=True,
help_text="subdomain.example.com")
plan = models.ForeignKey('Plan', on_delete=models.PROTECT)
created_at = models.DateTimeField(auto_now_add=True)
is_active = models.BooleanField(default=True)
def __str__(self):
return self.name
class Meta:
db_table = 'organizations'
indexes = [
models.Index(fields=['domain']), # Быстрый поиск по домену
models.Index(fields=['is_active', 'created_at']),
]
class Project(models.Model):
"""
Проект принадлежит организации.
Все запросы к проектам должны фильтроваться по organization.
"""
organization = models.ForeignKey(
Organization,
on_delete=models.CASCADE,
related_name='projects' # org.projects.all()
)
name = models.CharField(max_length=200)
description = models.TextField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'projects'
ordering = ['-created_at']
indexes = [
models.Index(fields=['organization', '-created_at']), # org.projects.all()
models.Index(fields=['organization', 'name']), # Поиск по имени
]
constraints = [
# Уникальность имени проекта в рамках организации
models.UniqueConstraint(
fields=['organization', 'name'],
name='unique_project_per_org'
)
]Проблема: Как гарантировать, что запросы всегда фильтруются по организации?
Решение: Кастомный менеджер, который автоматически добавляет фильтр.
# managers.py
from django.db import models
# Глобальное хранилище текущей организации (thread-local)
_current_organization = None
def get_current_organization():
"""Получить текущую организацию из контекста"""
return _current_organization
def set_current_organization(org):
"""Установить текущую организацию"""
global _current_organization
_current_organization = org
class TenantManager(models.Manager):
"""
Менеджер, который автоматически фильтрует queryset по текущей организации.
Использование:
Project.objects.all() # Вернёт проекты ТОЛЬКО текущей организации
"""
def get_queryset(self):
org = get_current_organization()
if not org:
# Если организация не установлена — возвращаем пустой queryset
# или выбрасываем ошибку (в зависимости от требований)
return self.get_queryset().none()
return super().get_queryset().filter(organization=org)
class TenantModel(models.Model):
"""
Абстрактная базовая модель для всех tenant-aware моделей.
Пример использования:
class Project(TenantModel):
name = models.CharField(max_length=200)
# Теперь Project.objects.all() автоматически фильтруется
"""
organization = models.ForeignKey('Organization', on_delete=models.CASCADE)
objects = TenantManager() # Фильтрованный менеджер
all_objects = models.Manager() # Нефильтрованный (для админки, отчётов)
class Meta:
abstract = TrueВопрос: Откуда берётся get_current_organization()?
Ответ: Из middleware, который извлекает организацию из запроса (домен/subdomain).
# middleware.py
from django.http import Http404
from .models import Organization
from .managers import set_current_organization
_current_organization = None
def get_current_organization():
return _current_organization
class TenantMiddleware:
"""
Middleware извлекает организацию из домена/subdomain и устанавливает контекст.
Как работает:
1. request.get_host() → 'acme.example.com'
2. Извлекаем subdomain → 'acme'
3. Находим Organization с domain='acme'
4. Устанавливаем в глобальный контекст
5. Все TenantManager запросы автоматически фильтруются
"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
domain = self.extract_domain(request)
if domain:
try:
org = Organization.objects.get(domain=domain, is_active=True)
set_current_organization(org)
request.organization = org # Доступно в views
except Organization.DoesNotExist:
# Организация не найдена — публичный маршрут или 404
set_current_organization(None)
request.organization = None
response = self.get_response(request)
return response
def extract_domain(self, request):
"""
Извлекает домен из запроса.
Примеры:
'acme.example.com' → 'acme' (subdomain)
'example.com' → 'example.com' (root domain)
"""
host = request.get_host().lower()
parts = host.split('.')
# Если 3+ части — это subdomain
if len(parts) >= 3:
return parts[0] # 'acme'
# Иначе — root domain
return host# settings.py
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
# ...
'apps.saas.middleware.TenantMiddleware', # ← Наш middleware
'django.middleware.common.CommonMiddleware',
# ...
]SaaS зарабатывает на рекуррентных платежах — регулярной оплате подписки.
Ключевые метрики:
Создадим гибкую систему тарифов:
# models.py
from decimal import Decimal
from django.db import models
class Plan(models.Model):
"""
Тарифный план.
Примеры:
Free: $0/мес, 3 проекта, 1GB хранилище
Pro: $29/мес, 50 проектов, 100GB, приоритетная поддержка
Enterprise: $299/мес, безлимит, SLA, dedicated manager
"""
name = models.CharField(max_length=100) # 'Free', 'Pro', 'Enterprise'
slug = models.SlugField(unique=True) # 'free', 'pro', 'enterprise'
price = models.DecimalField(
max_digits=10,
decimal_places=2,
default=0,
help_text="Цена в месяц"
)
currency = models.CharField(max_length=3, default='USD')
# Ограничения плана
features = models.JSONField(
default=list,
help_text="Список фич: ['analytics', 'api_access', 'priority_support']"
)
max_projects = models.IntegerField(default=5)
max_users = models.IntegerField(default=3)
api_calls_limit = models.IntegerField(default=1000)
storage_limit_mb = models.IntegerField(default=100)
# Статус
is_active = models.BooleanField(default=True)
# Stripe integration
stripe_price_id = models.CharField(
max_length=255,
blank=True,
help_text="ID цены в Stripe (price_...)"
)
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return f"{self.name} - ${self.price}/мес"
class Meta:
db_table = 'plans'
ordering = ['price']
verbose_name = 'Тарифный план'
verbose_name_plural = 'Тарифные планы'# models.py
from django.utils import timezone
class Subscription(models.Model):
"""
Подписка организации на тарифный план.
Жизненный цикл подписки:
1. trialing → trial period (14 дней бесплатно)
2. active → оплата прошла успешно
3. past_due → оплата не прошла, есть grace period
4. canceled → подписка отменена
5. unpaid → несколько неудачных попыток оплаты
"""
STATUS_CHOICES = [
('trialing', '🧪 Trialing'),
('active', '✅ Active'),
('incomplete', '⏳ Incomplete'),
('incomplete_expired', '❌ Incomplete Expired'),
('past_due', '⚠️ Past Due'),
('canceled', '🚫 Canceled'),
('unpaid', '💸 Unpaid'),
]
organization = models.OneToOneField(
'Organization',
on_delete=models.CASCADE,
related_name='subscription'
)
plan = models.ForeignKey(
Plan,
on_delete=models.PROTECT, # Нельзя удалить план с активными подписками
related_name='subscriptions'
)
# Stripe identifiers
stripe_subscription_id = models.CharField(max_length=255, unique=True, db_index=True)
stripe_customer_id = models.CharField(max_length=255, db_index=True)
# Статус и периоды
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='trialing')
current_period_start = models.DateTimeField()
current_period_end = models.DateTimeField()
# Отмена
cancel_at_period_end = models.BooleanField(default=False)
canceled_at = models.DateTimeField(null=True, blank=True)
# Trial
trial_end = models.DateTimeField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
def __str__(self):
return f"{self.organization.name} — {self.plan.name} ({self.status})"
@property
def is_active(self):
"""Подписка активна, если статус active или trialing"""
return self.status in ['active', 'trialing']
@property
def days_until_renewal(self):
"""Сколько дней до следующего списания"""
delta = self.current_period_end - timezone.now()
return max(0, delta.days)
@property
def is_trialing(self):
return self.status == 'trialing'
@property
def trial_days_left(self):
if not self.trial_end:
return 0
delta = self.trial_end - timezone.now()
return max(0, delta.days)
class Meta:
db_table = 'subscriptions'
verbose_name = 'Подписка'
verbose_name_plural = 'Подписки'
indexes = [
models.Index(fields=['status', 'current_period_end']), # Поиск активных
models.Index(fields=['stripe_customer_id']), # Webhook lookup
]Stripe — платёжная платформа для интернет-бизнеса.
Ключевые понятия:
# services/stripe_service.py
import stripe
from django.conf import settings
from django.utils import timezone
from datetime import timedelta
from ..models import Subscription, Plan
stripe.api_key = settings.STRIPE_SECRET_KEY
class StripeService:
"""
Сервис для работы со Stripe.
Основные операции:
1. Создание customer
2. Создание подписки
3. Обработка webhook событий
"""
@staticmethod
def create_customer(organization, email):
"""
Создаёт customer в Stripe.
Customer — это клиент, который может иметь подписки и платежные методы.
"""
customer = stripe.Customer.create(
email=email,
name=organization.name,
metadata={
'organization_id': organization.id,
'domain': organization.domain
}
)
return customer
@staticmethod
def create_subscription(organization, plan: Plan, email: str):
"""
Создаёт подписку в Stripe.
Процесс:
1. Создаём Stripe customer
2. Создаём Stripe subscription с trial period
3. Сохраняем в БД
"""
# 1. Создаём Stripe customer
customer = stripe.Customer.create(
email=email,
name=organization.name,
metadata={'organization_id': organization.id}
)
# 2. Создаём подписку
subscription = stripe.Subscription.create(
customer=customer.id,
items=[{'price': plan.stripe_price_id}],
trial_period_days=14, # 14 дней trial
payment_behavior='default_incomplete',
expand=['latest_invoice.payment_intent'] # Для обработки оплаты
)
# 3. Сохраняем в БД
sub = Subscription.objects.create(
organization=organization,
plan=plan,
stripe_subscription_id=subscription.id,
stripe_customer_id=customer.id,
status=subscription.status,
current_period_start=timezone.make_aware(
timezone.datetime.fromtimestamp(subscription.current_period_start)
),
current_period_end=timezone.make_aware(
timezone.datetime.fromtimestamp(subscription.current_period_end)
),
trial_end=timezone.make_aware(
timezone.datetime.fromtimestamp(subscription.trial_end)
) if subscription.trial_end else None
)
return sub
@staticmethod
def handle_webhook_event(event):
"""
Обрабатывает webhook события от Stripe.
События:
- customer.subscription.updated — подписка изменена
- customer.subscription.deleted — подписка отменена
- invoice.payment_succeeded — оплата прошла
- invoice.payment_failed — оплата не прошла
"""
event_type = event['type']
data = event['data']['object']
if event_type == 'customer.subscription.updated':
StripeService._update_subscription(data)
elif event_type == 'customer.subscription.deleted':
StripeService._cancel_subscription(data)
elif event_type == 'invoice.payment_succeeded':
StripeService._handle_payment_succeeded(data)
elif event_type == 'invoice.payment_failed':
StripeService._handle_payment_failed(data)
@staticmethod
def _update_subscription(stripe_sub):
"""Обновляет подписку при изменении в Stripe"""
try:
sub = Subscription.objects.get(stripe_subscription_id=stripe_sub['id'])
sub.status = stripe_sub['status']
sub.current_period_start = timezone.make_aware(
timezone.datetime.fromtimestamp(stripe_sub['current_period_start'])
)
sub.current_period_end = timezone.make_aware(
timezone.datetime.fromtimestamp(stripe_sub['current_period_end'])
)
sub.cancel_at_period_end = stripe_sub.get('cancel_at_period_end', False)
sub.save()
except Subscription.DoesNotExist:
# Подписка не найдена — создаём новую
pass
@staticmethod
def _cancel_subscription(stripe_sub):
"""Отменяет подписку"""
Subscription.objects.filter(
stripe_subscription_id=stripe_sub['id']
).update(status='canceled', canceled_at=timezone.now())
@staticmethod
def _handle_payment_succeeded(invoice):
"""Оплата прошла успешно"""
# Можно отправить благодарственное письмо
pass
@staticmethod
def _handle_payment_failed(invoice):
"""Оплата не прошла"""
# Можно отправить уведомление пользователю
pass# views/webhooks.py
from django.views.decorators.csrf import csrf_exempt
from django.http import HttpResponse
from rest_framework.views import APIView
from rest_framework.response import Response
import stripe
class StripeWebhookView(APIView):
"""
Endpoint для получения webhook от Stripe.
Stripe отправляет POST запрос при событиях:
- payment_succeeded
- payment_failed
- subscription_updated
- subscription_deleted
"""
authentication_classes = [] # Нет аутентификации
permission_classes = [] # Нет проверок прав
@csrf_exempt # CSRF не нужен для webhook
def post(self, request):
payload = request.body
sig_header = request.META.get('HTTP_STRIPE_SIGNATURE')
try:
# Проверяем подпись Stripe
event = stripe.Webhook.construct_event(
payload, sig_header, settings.STRIPE_WEBHOOK_SECRET
)
except (ValueError, stripe.error.SignatureVerificationError):
return Response({'error': 'Invalid payload'}, status=400)
# Обрабатываем событие
StripeService.handle_webhook_event(event)
return Response({'status': 'success'})SaaS для команд требует RBAC (Role-Based Access Control):
| Роль | Права |
|---|---|
| Owner | Полный доступ, billing, удаление организации |
| Admin | Управление проектами и участниками |
| Member | Создание и редактирование проектов |
| Viewer | Только просмотр |
# models.py
from django.contrib.auth.models import User
class OrganizationMember(models.Model):
"""
Участник организации.
Связывает User с Organization и определяет роль.
"""
ROLE_CHOICES = [
('owner', '👑 Owner'),
('admin', '🛡️ Admin'),
('member', '👤 Member'),
('viewer', '👁️ Viewer'),
]
# Матрица разрешений
PERMISSION_MATRIX = {
'owner': ['view', 'create', 'edit', 'delete', 'manage_members', 'billing'],
'admin': ['view', 'create', 'edit', 'delete', 'manage_members'],
'member': ['view', 'create', 'edit'],
'viewer': ['view'],
}
organization = models.ForeignKey(
'Organization',
on_delete=models.CASCADE,
related_name='members'
)
user = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name='organization_memberships'
)
role = models.CharField(
max_length=20,
choices=ROLE_CHOICES,
default='member'
)
invited_at = models.DateTimeField(auto_now_add=True)
invited_by = models.ForeignKey(
User,
on_delete=models.SET_NULL,
null=True,
related_name='invitations_sent'
)
is_active = models.BooleanField(default=True)
class Meta:
db_table = 'organization_members'
unique_together = ['organization', 'user'] # Один раз в организации
indexes = [
models.Index(fields=['organization', 'role']),
models.Index(fields=['user', 'is_active']),
]
def __str__(self):
return f"{self.user.email} — {self.organization.name} ({self.role})"
def has_permission(self, permission):
"""Проверка конкретного разрешения"""
return permission in self.PERMISSION_MATRIX.get(self.role, [])
@property
def can_manage_members(self):
"""Может приглашать/удалять участников"""
return self.role in ['owner', 'admin']
@property
def can_access_billing(self):
"""Может управлять подпиской"""
return self.role == 'owner'
@property
def can_delete_organization(self):
"""Только owner может удалить организацию"""
return self.role == 'owner'# models.py
from datetime import timedelta
from uuid import uuid4
class Invitation(models.Model):
"""
Приглашение в организацию.
Процесс:
1. Admin создаёт Invitation с email
2. Система отправляет email с ссылкой
3. Пользователь переходит по ссылке
4. Invitation → OrganizationMember
"""
STATUS_CHOICES = [
('pending', '⏳ Pending'),
('accepted', '✅ Accepted'),
('declined', '❌ Declined'),
('expired', '⌛ Expired'),
]
organization = models.ForeignKey(
'Organization',
on_delete=models.CASCADE,
related_name='invitations'
)
email = models.EmailField()
role = models.CharField(
max_length=20,
choices=OrganizationMember.ROLE_CHOICES,
default='member'
)
token = models.UUIDField(unique=True, editable=False)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
invited_by = models.ForeignKey(User, on_delete=models.CASCADE, related_name='sent_invitations')
invited_at = models.DateTimeField(auto_now_add=True)
expires_at = models.DateTimeField()
accepted_at = models.DateTimeField(null=True, blank=True)
class Meta:
db_table = 'organization_invitations'
indexes = [
models.Index(fields=['token']), # Поиск по токену
models.Index(fields=['email', 'status']), # Поиск по email
]
def save(self, *args, **kwargs):
"""Автозаполнение token и expires_at"""
if not self.token:
self.token = uuid4()
if not self.expires_at:
self.expires_at = timezone.now() + timedelta(days=7) # 7 дней
super().save(*args, **kwargs)
def accept(self, user):
"""
Принять приглашение.
Args:
user: User, который принял приглашение
Raises:
ValueError: Если приглашение уже использовано или истекло
"""
if self.status != 'pending':
raise ValueError("Invitation already used")
if timezone.now() > self.expires_at:
self.status = 'expired'
self.save()
raise ValueError("Invitation expired")
# Создаём членство
OrganizationMember.objects.create(
organization=self.organization,
user=user,
role=self.role,
invited_by=self.invited_by
)
self.status = 'accepted'
self.accepted_at = timezone.now()
self.save()# models.py
from django.db import models
from django.utils import timezone
class Usage(models.Model):
"""
Метрики использования организации.
Примеры метрик:
- api_calls — количество API запросов
- storage_mb — использовано хранилища
- emails_sent — отправлено email
- projects_count — количество проектов
"""
organization = models.ForeignKey(
'Organization',
on_delete=models.CASCADE,
related_name='usage_metrics'
)
metric = models.CharField(max_length=100)
quantity = models.BigIntegerField(default=0)
period = models.DateField() # Группировка по дням
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'usage_metrics'
unique_together = ['organization', 'metric', 'period']
indexes = [
models.Index(fields=['organization', 'period']),
models.Index(fields=['metric', 'period']),
]
@classmethod
def increment(cls, organization, metric, amount=1):
"""
Атомарное увеличение счётчика.
Использование:
Usage.increment(org, 'api_calls', 1)
"""
today = timezone.now().date()
obj, created = cls.objects.get_or_create(
organization=organization,
metric=metric,
period=today,
defaults={'quantity': amount}
)
if not created:
# F() для атомарного обновления
cls.objects.filter(pk=obj.pk).update(
quantity=models.F('quantity') + amount
)
@classmethod
def get_current_usage(cls, organization):
"""Получить текущее использование за сегодня"""
today = timezone.now().date()
return cls.objects.filter(
organization=organization,
period=today
).values('metric', 'quantity')# middleware/rate_limiter.py
from django.http import JsonResponse
from django.utils import timezone
from ..models import Usage, UsageLimit, OrganizationMember
class UsageRateLimiter:
"""
Middleware ограничивает использование API на основе тарифа.
Как работает:
1. Перехватывает запрос к /api/
2. Проверяет текущее использование
3. Если превышен лимит — возвращает 429
4. Иначе инкрементит счётчик
"""
def __init__(self, get_response):
self.get_response = get_response
self.exempt_paths = ['/api/health', '/api/webhooks/stripe']
def __call__(self, request):
# Пропускаем health checks
if request.path in self.exempt_paths:
return self.get_response(request)
# Проверяем только авторизованные запросы
if hasattr(request, 'organization') and request.user.is_authenticated:
org = request.organization
if request.path.startswith('/api/'):
limit = self._get_limit(org, 'api_calls')
current = self._get_current_usage(org, 'api_calls')
if current >= limit:
return JsonResponse(
{
'error': 'Usage limit exceeded',
'limit': limit,
'current': current,
'upgrade_url': '/billing/upgrade'
},
status=429
)
# Инкремент счётчика
Usage.increment(org, 'api_calls', 1)
return self.get_response(request)
def _get_limit(self, organization, metric):
"""Получить лимит из тарифа"""
plan = organization.plan
try:
limit = UsageLimit.objects.get(plan=plan, metric=metric)
return limit.limit
except UsageLimit.DoesNotExist:
# Лимит по умолчанию из плана
return getattr(plan, f'{metric}_limit', 1000)
def _get_current_usage(self, organization, metric):
"""Получить текущее использование"""
today = timezone.now().date()
try:
usage = Usage.objects.get(
organization=organization,
metric=metric,
period=today
)
return usage.quantity
except Usage.DoesNotExist:
return 0Проблема: Некоторые операции занимают время:
Решение: Выполнять в фоне, не блокируя запрос.

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0' # Брокер
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # Результаты
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60 # 30 минут максимум# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from django.conf import settings
from datetime import timedelta
from django.utils import timezone
from .models import Subscription, Invitation, Usage
@shared_task(bind=True, max_retries=3)
def send_invitation_email(self, invitation_id):
"""
Отправка приглашения по email.
retry: Если email не отправился — повторить 3 раза
"""
from .models import Invitation
try:
invitation = Invitation.objects.get(id=invitation_id)
accept_url = f"{settings.FRONTEND_URL}/invite/{invitation.token}"
send_mail(
subject=f'Приглашение в {invitation.organization.name}',
message=f'''
Вас пригласили в организацию {invitation.organization.name}.
Роль: {invitation.role}
Примите приглашение: {accept_url}
Ссылка действительна до {invitation.expires_at}.
''',
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=[invitation.email],
fail_silently=False,
)
except Exception as exc:
# Повторить через 5 минут
raise self.retry(exc, countdown=300)
@shared_task
def send_usage_warnings():
"""
Отправка предупреждений о приближении к лимитам.
Запускается ежедневно в 18:00.
"""
from .models import Organization
for org in Organization.objects.filter(is_active=True):
subscription = org.subscription
if not subscription or not subscription.is_active:
continue
limits = UsageLimit.objects.filter(plan=subscription.plan)
for limit in limits:
usage = Usage.objects.filter(
organization=org,
metric=limit.metric,
period=timezone.now().date()
).first()
# 80% от лимита — предупреждение
if usage and usage.quantity >= limit.limit * 0.8:
send_mail(
subject='⚠️ Предупреждение о лимите использования',
message=f'Вы использовали {usage.quantity} из {limit.limit} {limit.metric}',
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=[org.admin_email],
fail_silently=True,
)
@shared_task
def cleanup_expired_invitations():
"""Очистка просроченных приглашений"""
from .models import Invitation
count = Invitation.objects.filter(
status='pending',
expires_at__lt=timezone.now()
).update(status='expired')
return f'Cleaned {count} expired invitations'
@shared_task
def reset_daily_usage():
"""
Сброс дневных лимитов.
Usage хранится по дням, поэтому "сброс" — это просто новая дата period.
"""
pass # Nothing to do, new day = new period# settings.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'send-usage-warnings': {
'task': 'apps.saas.tasks.send_usage_warnings',
'schedule': crontab(hour=18, minute=0), # Каждый день в 18:00
},
'cleanup-expired-invitations': {
'task': 'apps.saas.tasks.cleanup_expired_invitations',
'schedule': crontab(hour=3, minute=0), # Каждый день в 03:00
},
'reset-daily-usage': {
'task': 'apps.saas.tasks.reset_daily_usage',
'schedule': crontab(minute=0, hour=0), # Каждый день в полночь
},
}Поздравляем! Вы прошли путь от основ Django до архитектуры production-ready SaaS платформы.
| Навык | Применение |
|---|---|
| Multi-tenancy | Изоляция данных организаций |
| Subscription Billing | Stripe, рекуррентные платежи |
| RBAC | Роли, permissions, приглашения |
| Usage Tracking | Метрики, лимиты, rate limiting |
| Celery | Фоновые задачи, планировщик |
| REST API | DRF, serializers, permissions |
| Security | Throttling, audit logging |
| DevOps | Docker, CI/CD, monitoring |
🚀 Помните: Лучший код — это код, который решает проблемы пользователей, а не усложняет жизнь разработчикам. Keep it simple, scale when needed.
Удачи в построении следующего unicorn! 🦄
Вопросы ещё не добавлены
Вопросы для этой подтемы ещё не добавлены.