当前位置: 首页 > article >正文

每天40分玩转Django:Django性能优化

Django性能优化

一、性能优化要点总览表

优化类别具体措施预期效果
数据库优化索引优化、查询优化、N+1问题提升查询速度、减少数据库负载
缓存优化Redis缓存、页面缓存、查询缓存减少数据库访问、提升响应速度
异步处理Celery任务队列、异步视图、异步ORM提升并发性能、优化用户体验

二、数据库优化实现

1. 模型设计和索引优化

# models.py
from django.db import models
from django.db.models import Index

class Category(models.Model):
    name = models.CharField(max_length=100)
    slug = models.SlugField(unique=True)
    
    class Meta:
        indexes = [
            models.Index(fields=['name']),
            models.Index(fields=['slug'])
        ]

class Article(models.Model):
    title = models.CharField(max_length=200)
    content = models.TextField()
    category = models.ForeignKey(Category, on_delete=models.CASCADE)
    author = models.ForeignKey('auth.User', on_delete=models.CASCADE)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    views = models.IntegerField(default=0)
    
    class Meta:
        indexes = [
            models.Index(fields=['created_at', '-views']),
            models.Index(fields=['category', 'author']),
        ]

2. 查询优化

# views.py
from django.db.models import Prefetch, Count, Q
from django.views.generic import ListView

class ArticleListView(ListView):
    model = Article
    template_name = 'articles/article_list.html'
    context_object_name = 'articles'
    
    def get_queryset(self):
        # 使用select_related减少外键查询
        queryset = Article.objects.select_related(
            'category', 'author'
        ).prefetch_related(
            'tags',
            Prefetch(
                'comments',
                queryset=Comment.objects.select_related('user')
            )
        )
        
        # 添加过滤条件
        category = self.request.GET.get('category')
        if category:
            queryset = queryset.filter(category__slug=category)
            
        # 添加聚合查询
        queryset = queryset.annotate(
            comment_count=Count('comments'),
            like_count=Count('likes')
        )
        
        return queryset.order_by('-created_at')

3. 批量操作优化

# utils.py
from django.db import transaction

def bulk_create_articles(articles_data):
    """批量创建文章"""
    with transaction.atomic():
        articles = [
            Article(
                title=data['title'],
                content=data['content'],
                category_id=data['category_id'],
                author_id=data['author_id']
            )
            for data in articles_data
        ]
        return Article.objects.bulk_create(articles)

def bulk_update_views(article_ids):
    """批量更新文章浏览量"""
    with transaction.atomic():
        Article.objects.filter(id__in=article_ids).update(
            views=models.F('views') + 1
        )

三、缓存优化实现

1. Redis缓存配置

# settings.py
CACHES = {
    'default': {
        'BACKEND': 'django_redis.cache.RedisCache',
        'LOCATION': 'redis://127.0.0.1:6379/1',
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
        }
    }
}

# 使用Redis作为Session后端
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'default'

2. 视图缓存实现

# views.py
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
from django.core.cache import cache
from django.conf import settings

@method_decorator(cache_page(60 * 15), name='dispatch')
class CategoryListView(ListView):
    model = Category
    template_name = 'categories/category_list.html'

class ArticleDetailView(DetailView):
    model = Article
    
    def get_object(self):
        article_id = self.kwargs['pk']
        cache_key = f'article_{article_id}'
        
        # 尝试从缓存获取
        article = cache.get(cache_key)
        if article is None:
            # 缓存未命中,从数据库获取
            article = super().get_object()
            # 存入缓存
            cache.set(cache_key, article, timeout=60*30)
        
        return article

def get_popular_articles():
    cache_key = 'popular_articles'
    articles = cache.get(cache_key)
    
    if articles is None:
        articles = Article.objects.annotate(
            total_score=Count('likes') + Count('comments')
        ).order_by('-total_score')[:10]
        cache.set(cache_key, articles, timeout=60*60)
    
    return articles

3. 缓存装饰器

# decorators.py
from functools import wraps
from django.core.cache import cache

def cache_result(timeout=300):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 生成缓存键
            cache_key = f"{func.__name__}:{args}:{kwargs}"
            result = cache.get(cache_key)
            
            if result is None:
                result = func(*args, **kwargs)
                cache.set(cache_key, result, timeout=timeout)
            
            return result
        return wrapper
    return decorator

# 使用示例
@cache_result(timeout=60*5)
def get_article_stats(article_id):
    return {
        'views': Article.objects.get(id=article_id).views,
        'comments': Comment.objects.filter(article_id=article_id).count(),
        'likes': Like.objects.filter(article_id=article_id).count()
    }

四、异步处理实现

1. Celery配置和任务

# celery.py
from celery import Celery
import os

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from .models import Article

@shared_task
def send_article_notification(article_id):
    article = Article.objects.get(id=article_id)
    subscribers = article.category.subscribers.all()
    
    for subscriber in subscribers:
        send_mail(
            f'新文章: {article.title}',
            f'查看最新文章:{article.get_absolute_url()}',
            'noreply@example.com',
            [subscriber.email],
            fail_silently=False,
        )

@shared_task
def update_article_stats():
    """定期更新文章统计信息"""
    for article in Article.objects.all():
        stats = get_article_stats(article.id)
        article.stats_cache = stats
        article.save()

2. 异步视图实现

# views.py
from django.http import JsonResponse
from asgiref.sync import sync_to_async
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

async def async_article_detail(request, pk):
    article = await sync_to_async(Article.objects.get)(id=pk)
    context = {
        'title': article.title,
        'content': article.content,
        'author': article.author.username
    }
    return JsonResponse(context)

class CommentCreateView(View):
    def post(self, request, article_id):
        comment = Comment.objects.create(
            article_id=article_id,
            user=request.user,
            content=request.POST['content']
        )
        
        # 异步发送WebSocket通知
        channel_layer = get_channel_layer()
        async_to_sync(channel_layer.group_send)(
            f"article_{article_id}",
            {
                "type": "comment.notification",
                "message": {
                    "comment_id": comment.id,
                    "user": comment.user.username,
                    "content": comment.content
                }
            }
        )
        
        return JsonResponse({'status': 'success'})

3. 异步中间件

# middleware.py
from django.core.cache import cache
from asgiref.sync import sync_to_async

class AsyncCacheMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response
    
    async def __call__(self, request):
        cache_key = f"page_cache:{request.path}"
        response = await sync_to_async(cache.get)(cache_key)
        
        if response is None:
            response = await self.get_response(request)
            await sync_to_async(cache.set)(
                cache_key,
                response,
                timeout=300
            )
        
        return response

五、性能监控

1. 查询日志记录

# middleware.py
import time
import logging
from django.db import connection

logger = logging.getLogger(__name__)

class QueryCountMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response
    
    def __call__(self, request):
        start_time = time.time()
        initial_queries = len(connection.queries)
        
        response = self.get_response(request)
        
        total_time = time.time() - start_time
        total_queries = len(connection.queries) - initial_queries
        
        if total_queries > 10:  # 设置查询数量阈值
            logger.warning(
                f'Path: {request.path} - '
                f'Queries: {total_queries} - '
                f'Time: {total_time:.2f}s'
            )
        
        return response

六、性能优化流程图

在这里插入图片描述

最佳实践建议:

  1. 数据库优化:

    • 合理使用索引
    • 避免N+1查询问题
    • 使用批量操作替代循环操作
    • 定期分析和优化慢查询
  2. 缓存策略:

    • 合理设置缓存时间
    • 分层缓存架构
    • 及时更新缓存
    • 避免缓存雪崩
  3. 异步处理:

    • 合理使用任务队列
    • 异步处理耗时操作
    • 适当使用异步视图
    • 监控任务执行状态

这就是关于Django性能优化的详细内容。通过实践这些优化策略,你可以显著提升Django应用的性能。如果有任何问题,欢迎随时提出!


怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!


http://www.kler.cn/a/465709.html

相关文章:

  • reactor中的并发
  • npm install --global windows-build-tools --save 失败
  • SQL Server 数据库 忘记密码
  • 设计模式学习[14]---状态模式
  • 【微服务】2、网关
  • 【信息系统项目管理师】高分论文:论信息系统项目的风险管理(数字化联合审查管理系统)
  • wpf 国际化 try catch comboBox
  • C# 设计模式(结构型模式):享元模式
  • 如何在iOS 11 中禁用高效图像格式 (HEIF)
  • 【Vim Masterclass 笔记01】Section 1:Course Overview + Section 2:Vim Quickstart
  • 适用不同业务场景的6种销售预测模型
  • WPF自定义任务栏缩略图
  • CSS进阶和SASS
  • halcon三维点云数据处理(二)
  • 《向量数据库指南》混合检索系统的深度探索与实践:从POC到生产级解决方案的构建
  • 毕设中所学
  • MCA:用于图像识别的深度卷积神经网络中的多维协同注意力
  • MAC录屏QuikTimePlayer工具录屏声音小免费解决方案
  • 机器学习之模型评估——混淆矩阵,交叉验证与数据标准化
  • PyQt下载M3U8文件
  • TK730 不允许更改资源库或跨客户端定制
  • JavaEE 初阶:线程(2)
  • RabbitMQ的常见面试题及其答案的总结
  • 美团商家端 字符验证码 分析
  • 使用npm 插件[mmdc]将.mmd时序图转换为图片
  • VuePress2配置unocss的闭坑指南