每天40分玩转Django:Django性能优化
Django性能优化
一、性能优化要点总览表
优化类别 | 具体措施 | 预期效果 |
---|---|---|
数据库优化 | 索引优化、查询优化、N+1问题 | 提升查询速度、减少数据库负载 |
缓存优化 | Redis缓存、页面缓存、查询缓存 | 减少数据库访问、提升响应速度 |
异步处理 | Celery任务队列、异步视图、异步ORM | 提升并发性能、优化用户体验 |
二、数据库优化实现
1. 模型设计和索引优化
# models.py
from django.db import models
from django.db.models import Index
class Category(models.Model):
name = models.CharField(max_length=100)
slug = models.SlugField(unique=True)
class Meta:
indexes = [
models.Index(fields=['name']),
models.Index(fields=['slug'])
]
class Article(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
category = models.ForeignKey(Category, on_delete=models.CASCADE)
author = models.ForeignKey('auth.User', on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
views = models.IntegerField(default=0)
class Meta:
indexes = [
models.Index(fields=['created_at', '-views']),
models.Index(fields=['category', 'author']),
]
2. 查询优化
# views.py
from django.db.models import Prefetch, Count, Q
from django.views.generic import ListView
class ArticleListView(ListView):
model = Article
template_name = 'articles/article_list.html'
context_object_name = 'articles'
def get_queryset(self):
# 使用select_related减少外键查询
queryset = Article.objects.select_related(
'category', 'author'
).prefetch_related(
'tags',
Prefetch(
'comments',
queryset=Comment.objects.select_related('user')
)
)
# 添加过滤条件
category = self.request.GET.get('category')
if category:
queryset = queryset.filter(category__slug=category)
# 添加聚合查询
queryset = queryset.annotate(
comment_count=Count('comments'),
like_count=Count('likes')
)
return queryset.order_by('-created_at')
3. 批量操作优化
# utils.py
from django.db import transaction
def bulk_create_articles(articles_data):
"""批量创建文章"""
with transaction.atomic():
articles = [
Article(
title=data['title'],
content=data['content'],
category_id=data['category_id'],
author_id=data['author_id']
)
for data in articles_data
]
return Article.objects.bulk_create(articles)
def bulk_update_views(article_ids):
"""批量更新文章浏览量"""
with transaction.atomic():
Article.objects.filter(id__in=article_ids).update(
views=models.F('views') + 1
)
三、缓存优化实现
1. Redis缓存配置
# settings.py
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/1',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
}
}
}
# 使用Redis作为Session后端
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'default'
2. 视图缓存实现
# views.py
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
from django.core.cache import cache
from django.conf import settings
@method_decorator(cache_page(60 * 15), name='dispatch')
class CategoryListView(ListView):
model = Category
template_name = 'categories/category_list.html'
class ArticleDetailView(DetailView):
model = Article
def get_object(self):
article_id = self.kwargs['pk']
cache_key = f'article_{article_id}'
# 尝试从缓存获取
article = cache.get(cache_key)
if article is None:
# 缓存未命中,从数据库获取
article = super().get_object()
# 存入缓存
cache.set(cache_key, article, timeout=60*30)
return article
def get_popular_articles():
cache_key = 'popular_articles'
articles = cache.get(cache_key)
if articles is None:
articles = Article.objects.annotate(
total_score=Count('likes') + Count('comments')
).order_by('-total_score')[:10]
cache.set(cache_key, articles, timeout=60*60)
return articles
3. 缓存装饰器
# decorators.py
from functools import wraps
from django.core.cache import cache
def cache_result(timeout=300):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"{func.__name__}:{args}:{kwargs}"
result = cache.get(cache_key)
if result is None:
result = func(*args, **kwargs)
cache.set(cache_key, result, timeout=timeout)
return result
return wrapper
return decorator
# 使用示例
@cache_result(timeout=60*5)
def get_article_stats(article_id):
return {
'views': Article.objects.get(id=article_id).views,
'comments': Comment.objects.filter(article_id=article_id).count(),
'likes': Like.objects.filter(article_id=article_id).count()
}
四、异步处理实现
1. Celery配置和任务
# celery.py
from celery import Celery
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from .models import Article
@shared_task
def send_article_notification(article_id):
article = Article.objects.get(id=article_id)
subscribers = article.category.subscribers.all()
for subscriber in subscribers:
send_mail(
f'新文章: {article.title}',
f'查看最新文章:{article.get_absolute_url()}',
'noreply@example.com',
[subscriber.email],
fail_silently=False,
)
@shared_task
def update_article_stats():
"""定期更新文章统计信息"""
for article in Article.objects.all():
stats = get_article_stats(article.id)
article.stats_cache = stats
article.save()
2. 异步视图实现
# views.py
from django.http import JsonResponse
from asgiref.sync import sync_to_async
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
async def async_article_detail(request, pk):
article = await sync_to_async(Article.objects.get)(id=pk)
context = {
'title': article.title,
'content': article.content,
'author': article.author.username
}
return JsonResponse(context)
class CommentCreateView(View):
def post(self, request, article_id):
comment = Comment.objects.create(
article_id=article_id,
user=request.user,
content=request.POST['content']
)
# 异步发送WebSocket通知
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f"article_{article_id}",
{
"type": "comment.notification",
"message": {
"comment_id": comment.id,
"user": comment.user.username,
"content": comment.content
}
}
)
return JsonResponse({'status': 'success'})
3. 异步中间件
# middleware.py
from django.core.cache import cache
from asgiref.sync import sync_to_async
class AsyncCacheMiddleware:
def __init__(self, get_response):
self.get_response = get_response
async def __call__(self, request):
cache_key = f"page_cache:{request.path}"
response = await sync_to_async(cache.get)(cache_key)
if response is None:
response = await self.get_response(request)
await sync_to_async(cache.set)(
cache_key,
response,
timeout=300
)
return response
五、性能监控
1. 查询日志记录
# middleware.py
import time
import logging
from django.db import connection
logger = logging.getLogger(__name__)
class QueryCountMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
start_time = time.time()
initial_queries = len(connection.queries)
response = self.get_response(request)
total_time = time.time() - start_time
total_queries = len(connection.queries) - initial_queries
if total_queries > 10: # 设置查询数量阈值
logger.warning(
f'Path: {request.path} - '
f'Queries: {total_queries} - '
f'Time: {total_time:.2f}s'
)
return response
六、性能优化流程图
最佳实践建议:
-
数据库优化:
- 合理使用索引
- 避免N+1查询问题
- 使用批量操作替代循环操作
- 定期分析和优化慢查询
-
缓存策略:
- 合理设置缓存时间
- 分层缓存架构
- 及时更新缓存
- 避免缓存雪崩
-
异步处理:
- 合理使用任务队列
- 异步处理耗时操作
- 适当使用异步视图
- 监控任务执行状态
这就是关于Django性能优化的详细内容。通过实践这些优化策略,你可以显著提升Django应用的性能。如果有任何问题,欢迎随时提出!
怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!