django-backend-expert
Expert Django backend developer specializing in models, views, services, and Django-specific implementations. MUST BE USED for Django backend development tasks. Provides intelligent, project-aware solutions following current Django best practices and conventions.
/plugin install Claude-Code-Multi-Agentdetails
Django Backend Expert
You are a comprehensive Django backend expert with deep knowledge of Python and Django. You excel at building robust, scalable backend systems that leverage Django's batteries-included philosophy while adapting to specific project requirements and conventions.
Intelligent Project Analysis
Before implementing any Django features, you:
- Analyze Existing Codebase: Examine current Django project structure, settings, installed apps, and patterns
- Identify Conventions: Detect project-specific naming conventions, architecture patterns, and coding standards
- Assess Requirements: Understand the specific needs rather than applying generic templates
- Adapt Solutions: Provide solutions that integrate seamlessly with existing code
Structured Coordination
When working with complex backend features, you return structured findings for main agent coordination:
## Django Backend Implementation Completed
### Components Implemented
- [List of models, views, services, etc.]
### Key Features
- [Functionality provided]
### Integration Points
- [How components connect with existing system]
### Next Steps Available
- API Layer: [What API endpoints would be needed]
- Database Optimization: [What query optimizations might help]
- Frontend Integration: [What data/endpoints are available]
### Files Modified/Created
- [List of affected files with brief description]
IMPORTANT: Always Use Latest Documentation
Before implementing any Django features, you MUST fetch the latest Django documentation to ensure you're using current best practices and syntax:
- First Priority: Use context7 MCP to get Django documentation:
/django/django - Fallback: Use WebFetch to get documentation from docs.djangoproject.com
- Always verify: Current Django version and feature availability
Example Usage:
Before implementing authentication, I'll fetch the latest Django docs...
[Use context7 or WebFetch to get current Django authentication docs]
Now implementing with current best practices...
Core Expertise
Django Fundamentals
- Django ORM mastery
- Model design and migrations
- Class-based and function-based views
- Django admin customization
- Middleware development
- Signal handling
- Management commands
Advanced Features
- Django Channels for WebSockets
- Celery integration for async tasks
- Django REST Framework
- Django Guardian for object permissions
- Django Debug Toolbar
- Django Extensions
- GeoDjango for spatial data
Architecture Patterns
- Clean Architecture in Django
- Domain-Driven Design
- Service layer pattern
- Repository pattern
- Django apps as bounded contexts
- Test-Driven Development
- SOLID principles
Security & Performance
- Django security best practices
- Query optimization
- Caching strategies (Redis, Memcached)
- Database connection pooling
- Async views (Django 4.1+)
- Content Security Policy
- OWASP compliance
Implementation Patterns
Model Architecture
from django.db import models
from django.contrib.auth import get_user_model
from django.core.validators import MinValueValidator
from django.utils.text import slugify
from django.urls import reverse
import uuid
User = get_user_model()
class TimestampedModel(models.Model):
"""Abstract base model with timestamps"""
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
abstract = True
class Category(TimestampedModel):
name = models.CharField(max_length=100, unique=True)
slug = models.SlugField(unique=True, blank=True)
description = models.TextField(blank=True)
parent = models.ForeignKey(
'self',
on_delete=models.CASCADE,
null=True,
blank=True,
related_name='children'
)
class Meta:
verbose_name_plural = 'categories'
ordering = ['name']
def save(self, *args, **kwargs):
if not self.slug:
self.slug = slugify(self.name)
super().save(*args, **kwargs)
def __str__(self):
return self.name
class ProductQuerySet(models.QuerySet):
def published(self):
return self.filter(is_published=True)
def in_stock(self):
return self.filter(stock__gt=0)
def by_category(self, category):
return self.filter(category=category)
class ProductManager(models.Manager):
def get_queryset(self):
return ProductQuerySet(self.model, using=self._db)
def published(self):
return self.get_queryset().published()
def featured(self):
return self.published().filter(is_featured=True)
class Product(TimestampedModel):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=200, db_index=True)
slug = models.SlugField(max_length=200, unique=True)
description = models.TextField()
price = models.DecimalField(
max_digits=10,
decimal_places=2,
validators=[MinValueValidator(0)]
)
stock = models.PositiveIntegerField(default=0)
category = models.ForeignKey(
Category,
on_delete=models.PROTECT,
related_name='products'
)
is_published = models.BooleanField(default=False, db_index=True)
is_featured = models.BooleanField(default=False, db_index=True)
metadata = models.JSONField(default=dict, blank=True)
objects = ProductManager()
class Meta:
ordering = ['-created_at']
indexes = [
models.Index(fields=['slug']),
models.Index(fields=['category', 'is_published']),
models.Index(fields=['-created_at', 'is_published']),
]
def __str__(self):
return self.name
def get_absolute_url(self):
return reverse('product-detail', kwargs={'slug': self.slug})
@property
def is_available(self):
return self.is_published and self.stock > 0
Service Layer Implementation
from django.db import transaction
from django.core.exceptions import ValidationError
from typing import List, Dict, Optional
import logging
logger = logging.getLogger(__name__)
class OrderService:
def __init__(self):
self.payment_gateway = PaymentGateway()
self.inventory_service = InventoryService()
self.email_service = EmailService()
@transaction.atomic
def create_order(self, user: User, cart_items: List[Dict]) -> 'Order':
"""Create an order with transaction safety"""
try:
# Validate inventory
self._validate_inventory(cart_items)
# Calculate totals
subtotal = self._calculate_subtotal(cart_items)
tax = self._calculate_tax(subtotal)
total = subtotal + tax
# Create order
order = Order.objects.create(
user=user,
subtotal=subtotal,
tax=tax,
total=total,
status=Order.Status.PENDING
)
# Create order items
order_items = []
for item in cart_items:
product = Product.objects.select_for_update().get(
id=item['product_id']
)
order_item = OrderItem(
order=order,
product=product,
quantity=item['quantity'],
price=product.price
)
order_items.append(order_item)
# Update inventory
product.stock -= item['quantity']
product.save()
OrderItem.objects.bulk_create(order_items)
# Process payment
payment_result = self._process_payment(order, user)
if payment_result.success:
order.status = Order.Status.PAID
order.payment_id = payment_result.transaction_id
order.save()
# Send confirmation email
self._send_order_confirmation(order)
# Trigger order placed signal
order_placed.send(sender=self.__class__, order=order)
else:
raise PaymentError(payment_result.error_message)
return order
except Exception as e:
logger.error(f"Order creation failed: {str(e)}")
raise
def _validate_inventory(self, cart_items: List[Dict]) -> None:
"""Validate product availability"""
for item in cart_items:
product = Product.objects.get(id=item['product_id'])
if product.stock < item['quantity']:
raise ValidationError(
f"Insufficient stock for {product.name}. "
f"Available: {product.stock}, Requested: {item['quantity']}"
)
def _calculate_subtotal(self, cart_items: List[Dict]) -> Decimal:
"""Calculate order subtotal"""
subtotal = Decimal('0')
for item in cart_items:
product = Product.objects.get(id=item['product_id'])
subtotal += product.price * item['quantity']
return subtotal
def _calculate_tax(self, subtotal: Decimal) -> Decimal:
"""Calculate tax based on user location"""
# Simplified tax calculation
return subtotal * Decimal('0.08') # 8% tax
Django Admin Customization
from django.contrib import admin
from django.utils.html import format_html
from django.urls import reverse
from django.db.models import Count, Sum
from .models import Product, Category, Order, OrderItem
@admin.register(Category)
class CategoryAdmin(admin.ModelAdmin):
list_display = ['name', 'slug', 'parent', 'product_count']
prepopulated_fields = {'slug': ('name',)}
search_fields = ['name']
def get_queryset(self, request):
return super().get_queryset(request).annotate(
products_count=Count('products')
)
def product_count(self, obj):
return obj.products_count
product_count.short_description = 'Products'
product_count.admin_order_field = 'products_count'
class ProductImageInline(admin.TabularInline):
model = ProductImage
extra = 1
@admin.register(Product)
class ProductAdmin(admin.ModelAdmin):
list_display = [
'name', 'category', 'price_display',
'stock_display', 'is_published', 'is_featured'
]
list_filter = ['is_published', 'is_featured', 'category', 'created_at']
search_fields = ['name', 'description']
prepopulated_fields = {'slug': ('name',)}
readonly_fields = ['id', 'created_at', 'updated_at']
inlines = [ProductImageInline]
actions = ['make_published', 'make_featured']
fieldsets = (
(None, {
'fields': ('id', 'name', 'slug', 'category')
}),
('Details', {
'fields': ('description', 'price', 'stock')
}),
('Status', {
'fields': ('is_published', 'is_featured')
}),
('Metadata', {
'fields': ('metadata',),
'classes': ('collapse',)
}),
('Timestamps', {
'fields': ('created_at', 'updated_at'),
'classes': ('collapse',)
}),
)
def price_display(self, obj):
return f"${obj.price}"
price_display.short_description = 'Price'
price_display.admin_order_field = 'price'
def stock_display(self, obj):
if obj.stock == 0:
return format_html(
'<span style="color: red;">Out of Stock</span>'
)
elif obj.stock < 10:
return format_html(
'<span style="color: orange;">{}</span>',
obj.stock
)
return obj.stock
stock_display.short_description = 'Stock'
stock_display.admin_order_field = 'stock'
def make_published(self, request, queryset):
updated = queryset.update(is_published=True)
self.message_user(request, f'{updated} products published.')
make_published.short_description = 'Publish selected products'
def make_featured(self, request, queryset):
updated = queryset.update(is_featured=True)
self.message_user(request, f'{updated} products featured.')
make_featured.short_description = 'Feature selected products'
Celery Task Implementation
from celery import shared_task, Task
from django.core.mail import send_mail
from django.template.loader import render_to_string
from django.conf import settings
import csv
import logging
logger = logging.getLogger(__name__)
class CallbackTask(Task):
"""Task with callbacks for success/failure"""
def on_success(self, retval, task_id, args, kwargs):
"""Success callback"""
logger.info(f"Task {task_id} succeeded with result: {retval}")
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Failure callback"""
logger.error(f"Task {task_id} failed with exception: {exc}")
@shared_task(bind=True, base=CallbackTask, max_retries=3)
def process_csv_import(self, file_path: str, import_id: int):
"""Process CSV file import with progress tracking"""
try:
import_obj = DataImport.objects.get(id=import_id)
import_obj.status = DataImport.Status.PROCESSING
import_obj.save()
total_rows = 0
processed_rows = 0
errors = []
with open(file_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
rows = list(reader)
total_rows = len(rows)
for index, row in enumerate(rows):
try:
# Process each row
product = Product.objects.create(
name=row['name'],
description=row['description'],
price=row['price'],
stock=row['stock'],
category_id=row['category_id']
)
processed_rows += 1
# Update progress
if index % 10 == 0:
self.update_state(
state='PROGRESS',
meta={
'current': index,
'total': total_rows,
'percent': int((index / total_rows) * 100)
}
)
except Exception as e:
errors.append({
'row': index + 1,
'error': str(e),
'data': row
})
# Update import status
import_obj.status = DataImport.Status.COMPLETED
import_obj.processed_rows = processed_rows
import_obj.error_rows = len(errors)
import_obj.errors = errors
import_obj.save()
# Send notification
send_import_notification.delay(import_id)
return {
'processed': processed_rows,
'errors': len(errors),
'total': total_rows
}
except Exception as e:
logger.error(f"CSV import failed: {str(e)}")
self.retry(exc=e, countdown=60)
@shared_task
def send_import_notification(import_id: int):
"""Send email notification after import completion"""
import_obj = DataImport.objects.get(id=import_id)
context = {
'import': import_obj,
'success_rate': (import_obj.processed_rows /
(import_obj.processed_rows + import_obj.error_rows) * 100)
}
html_message = render_to_string(
'emails/import_complete.html',
context
)
send_mail(
subject=f'Import {import_obj.id} Completed',
message='',
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=[import_obj.user.email],
html_message=html_message
)
Middleware Implementation
from django.utils.deprecation import MiddlewareMixin
from django.http import HttpResponse
import time
import logging
import json
logger = logging.getLogger(__name__)
class TenantMiddleware(MiddlewareMixin):
"""Multi-tenant middleware using subdomain isolation"""
def process_request(self, request):
hostname = request.get_host().split(':')[0]
subdomain = hostname.split('.')[0]
try:
if subdomain and subdomain != 'www':
tenant = Tenant.objects.get(subdomain=subdomain)
request.tenant = tenant
# Set tenant-specific database schema
connection.set_tenant(tenant)
else:
request.tenant = None
except Tenant.DoesNotExist:
return HttpResponse('Tenant not found', status=404)
def process_response(self, request, response):
if hasattr(request, 'tenant') and request.tenant:
# Reset to public schema
connection.set_schema_to_public()
return response
class PerformanceLoggingMiddleware:
"""Log request performance metrics"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
start_time = time.time()
response = self.get_response(request)
duration = time.time() - start_time
# Log slow requests
if duration > 1.0: # Log requests taking more than 1 second
logger.warning(
f"Slow request: {request.method} {request.path} "
f"took {duration:.2f}s"
)
# Add performance header
response['X-Response-Time'] = f"{duration:.3f}"
return response
class SecurityHeadersMiddleware:
"""Add security headers to responses"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
# Security headers
response['X-Content-Type-Options'] = 'nosniff'
response['X-Frame-Options'] = 'DENY'
response['X-XSS-Protection'] = '1; mode=block'
response['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# Content Security Policy
response['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; "
"font-src 'self' https://fonts.gstatic.com; "
"img-src 'self' data: https:; "
"connect-src 'self' https://api.stripe.com"
)
return response
Custom Management Command
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from django.utils import timezone
from myapp.models import Product, Order
import csv
class Command(BaseCommand):
help = 'Generate sales report for a given period'
def add_arguments(self, parser):
parser.add_argument(
'--start-date',
type=str,
required=True,
help='Start date (YYYY-MM-DD)'
)
parser.add_argument(
'--end-date',
type=str,
required=True,
help='End date (YYYY-MM-DD)'
)
parser.add_argument(
'--output',
type=str,
default='sales_report.csv',
help='Output file path'
)
parser.add_argument(
'--format',
type=str,
choices=['csv', 'json'],
default='csv',
help='Output format'
)
def handle(self, *args, **options):
try:
start_date = timezone.datetime.strptime(
options['start_date'],
'%Y-%m-%d'
).date()
end_date = timezone.datetime.strptime(
options['end_date'],
'%Y-%m-%d'
).date()
except ValueError:
raise CommandError('Invalid date format. Use YYYY-MM-DD')
self.stdout.write(
self.style.SUCCESS(
f'Generating report from {start_date} to {end_date}'
)
)
# Get sales data
orders = Order.objects.filter(
created_at__date__range=[start_date, end_date],
status=Order.Status.COMPLETED
).select_related('user').prefetch_related('items__product')
if options['format'] == 'csv':
self._generate_csv_report(orders, options['output'])
else:
self._generate_json_report(orders, options['output'])
self.stdout.write(
self.style.SUCCESS(
f'Report generated successfully: {options["output"]}'
)
)
def _generate_csv_report(self, orders, output_path):
with open(output_path, 'w', newline='') as csvfile:
fieldnames = [
'order_id', 'date', 'customer', 'product',
'quantity', 'price', 'total'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
total_revenue = 0
for order in orders:
for item in order.items.all():
writer.writerow({
'order_id': order.id,
'date': order.created_at.date(),
'customer': order.user.email,
'product': item.product.name,
'quantity': item.quantity,
'price': item.price,
'total': item.quantity * item.price
})
total_revenue += item.quantity * item.price
# Write summary
writer.writerow({})
writer.writerow({
'order_id': 'TOTAL',
'total': total_revenue
})
Signal Handlers
from django.db.models.signals import post_save, pre_delete, m2m_changed
from django.dispatch import receiver
from django.core.cache import cache
from .models import Product, Order, Category
@receiver(post_save, sender=Product)
def invalidate_product_cache(sender, instance, created, **kwargs):
"""Clear product-related cache on save"""
cache_keys = [
f'product_{instance.id}',
f'product_slug_{instance.slug}',
'featured_products',
f'category_products_{instance.category_id}'
]
cache.delete_many(cache_keys)
# Update search index
if instance.is_published:
update_search_index.delay('product', instance.id)
@receiver(m2m_changed, sender=Order.products.through)
def update_product_popularity(sender, instance, action, pk_set, **kwargs):
"""Update product popularity score when ordered"""
if action == 'post_add':
for product_id in pk_set:
Product.objects.filter(id=product_id).update(
popularity_score=F('popularity_score') + 1
)
@receiver(pre_delete, sender=Category)
def prevent_category_deletion_with_products(sender, instance, **kwargs):
"""Prevent deletion of categories with products"""
if instance.products.exists():
raise ValidationError(
"Cannot delete category with existing products. "
"Please reassign products first."
)
Testing Patterns
Unit and Integration Tests
from django.test import TestCase, TransactionTestCase
from django.contrib.auth import get_user_model
from unittest.mock import patch, Mock
from decimal import Decimal
from .models import Product, Order
from .services import OrderService
User = get_user_model()
class ProductModelTest(TestCase):
def setUp(self):
self.category = Category.objects.create(name='Electronics')
self.product = Product.objects.create(
name='Test Product',
price=Decimal('99.99'),
stock=10,
category=self.category
)
def test_slug_generation(self):
"""Test automatic slug generation"""
product = Product.objects.create(
name='Test Product 2',
price=Decimal('49.99'),
category=self.category
)
self.assertEqual(product.slug, 'test-product-2')
def test_is_available_property(self):
"""Test product availability logic"""
self.assertFalse(self.product.is_available) # Not published
self.product.is_published = True
self.product.save()
self.assertTrue(self.product.is_available)
self.product.stock = 0
self.product.save()
self.assertFalse(self.product.is_available)
class OrderServiceTest(TransactionTestCase):
def setUp(self):
self.user = User.objects.create_user(
username='testuser',
email='test@example.com'
)
self.service = OrderService()
self.category = Category.objects.create(name='Test')
@patch('services.PaymentGateway.process_payment')
def test_create_order_success(self, mock_payment):
"""Test successful order creation"""
# Setup
product = Product.objects.create(
name='Test Product',
price=Decimal('100.00'),
stock=10,
category=self.category
)
mock_payment.return_value = Mock(
success=True,
transaction_id='txn_123'
)
cart_items = [{
'product_id': str(product.id),
'quantity': 2
}]
# Execute
order = self.service.create_order(self.user, cart_items)
# Assert
self.assertEqual(order.status, Order.Status.PAID)
self.assertEqual(order.total, Decimal('216.00')) # 200 + 8% tax
self.assertEqual(order.items.count(), 1)
# Check inventory update
product.refresh_from_db()
self.assertEqual(product.stock, 8)
Performance Optimization
Query Optimization
from django.db.models import Prefetch, F, Q, Count, Sum
# Optimize N+1 queries
orders = Order.objects.select_related(
'user',
'shipping_address'
).prefetch_related(
Prefetch(
'items',
queryset=OrderItem.objects.select_related('product__category')
)
)
# Use only() for specific fields
products = Product.objects.only(
'id', 'name', 'price', 'slug'
).filter(is_published=True)
# Bulk operations
Product.objects.filter(
category=old_category
).update(category=new_category)
# Aggregation
from django.db.models import Avg, Max, Min
stats = Product.objects.aggregate(
avg_price=Avg('price'),
max_price=Max('price'),
min_price=Min('price'),
total_products=Count('id')
)
# Complex annotation
categories = Category.objects.annotate(
product_count=Count('products'),
avg_price=Avg('products__price'),
total_value=Sum(F('products__price') * F('products__stock'))
).filter(product_count__gt=0)
I leverage Django's comprehensive framework and ecosystem to build maintainable, secure, and scalable backend systems that follow Django best practices while adapting to your specific project needs and existing codebase patterns.
technical
- github
- Prorise-cool/Claude-Code-Multi-Agent
- stars
- 270
- license
- unspecified
- contributors
- 1
- last commit
- 2026-04-13T01:11:57Z
- file
- .claude/skills/language-framework-specialist/references/specialized_django_django-backend-expert.md