当前位置: 首页 > article >正文

每天40分玩转Django:Django Channels

Django Channels

一、今日学习内容概览

知识模块重点内容难度系数
WebSocket基础WebSocket协议、连接建立、消息传输★★★★☆
消息路由URL路由配置、消费者编写、消息处理★★★★☆
Channels配置项目配置、ASGI应用、Channel Layers★★★☆☆

二、WebSocket基础

1. 环境配置

首先安装必要的包:

pip install channels
pip install channels-redis
redis-server  # 确保Redis服务已启动

2. 项目配置

# settings.py
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'channels',  # 添加channels
    'chat',      # 示例应用
]

# 将ASGI应用设置为默认
ASGI_APPLICATION = 'myproject.asgi.application'

# Channel层配置
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}
# asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from chat.routing import websocket_urlpatterns

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(
            websocket_urlpatterns
        )
    ),
})

让我们创建一个流程图来说明WebSocket连接的建立过程:
在这里插入图片描述

三、实现实时聊天室

1. 创建Consumer

# chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'

        # 将channel加入群组
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        await self.accept()

    async def disconnect(self, close_code):
        # 将channel从群组中移除
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']
        username = text_data_json.get('username', 'Anonymous')

        # 向群组发送消息
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message,
                'username': username
            }
        )

    async def chat_message(self, event):
        message = event['message']
        username = event['username']

        # 向WebSocket发送消息
        await self.send(text_data=json.dumps({
            'message': message,
            'username': username
        }))

2. 配置路由

# chat/routing.py
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]

3. 创建视图和模板

# chat/views.py
from django.shortcuts import render

def index(request):
    return render(request, 'chat/index.html')

def room(request, room_name):
    return render(request, 'chat/room.html', {
        'room_name': room_name
    })
# chat/urls.py
from django.urls import path
from . import views

urlpatterns = [
    path('', views.index, name='index'),
    path('<str:room_name>/', views.room, name='room'),
]
<!-- chat/templates/chat/room.html -->
<!DOCTYPE html>
<html>
<head>
    <title>Chat Room</title>
    <style>
        #chat-messages {
            height: 300px;
            overflow-y: scroll;
            border: 1px solid #ccc;
            padding: 10px;
            margin-bottom: 10px;
        }
    </style>
</head>
<body>
    <div id="chat-messages"></div>
    <input type="text" id="username" placeholder="Your name">
    <input type="text" id="chat-message-input" placeholder="Message">
    <button id="chat-message-submit">Send</button>

    <script>
        const roomName = {{ room_name|safe }};
        const chatSocket = new WebSocket(
            'ws://' + window.location.host + '/ws/chat/' + roomName + '/'
        );

        const messagesDiv = document.querySelector('#chat-messages');
        const messageInput = document.querySelector('#chat-message-input');
        const usernameInput = document.querySelector('#username');
        const submitButton = document.querySelector('#chat-message-submit');

        chatSocket.onmessage = function(e) {
            const data = JSON.parse(e.data);
            const messageElement = document.createElement('div');
            messageElement.textContent = `${data.username}: ${data.message}`;
            messagesDiv.appendChild(messageElement);
            messagesDiv.scrollTop = messagesDiv.scrollHeight;
        };

        chatSocket.onclose = function(e) {
            console.error('Chat socket closed unexpectedly');
        };

        messageInput.focus();
        messageInput.onkeyup = function(e) {
            if (e.keyCode === 13) {
                submitButton.click();
            }
        };

        submitButton.onclick = function(e) {
            const message = messageInput.value;
            const username = usernameInput.value || 'Anonymous';
            if (message) {
                chatSocket.send(JSON.stringify({
                    'message': message,
                    'username': username
                }));
                messageInput.value = '';
            }
        };
    </script>
</body>
</html>

4. 增加错误处理和重连机制

# chat/consumers.py
class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        try:
            self.room_name = self.scope['url_route']['kwargs']['room_name']
            self.room_group_name = f'chat_{self.room_name}'

            await self.channel_layer.group_add(
                self.room_group_name,
                self.channel_name
            )
            await self.accept()
            
            # 发送连接成功消息
            await self.send(text_data=json.dumps({
                'type': 'connection_established',
                'message': 'Connected to chat room'
            }))
            
        except Exception as e:
            await self.close(code=4000)
            
    async def receive(self, text_data):
        try:
            text_data_json = json.loads(text_data)
            message = text_data_json['message']
            username = text_data_json.get('username', 'Anonymous')
            
            # 消息验证
            if not message or len(message) > 1000:
                raise ValueError('Invalid message')
                
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    'type': 'chat_message',
                    'message': message,
                    'username': username
                }
            )
            
        except json.JSONDecodeError:
            await self.send(text_data=json.dumps({
                'type': 'error',
                'message': 'Invalid JSON format'
            }))
        except ValueError as e:
            await self.send(text_data=json.dumps({
                'type': 'error',
                'message': str(e)
            }))

5. 添加用户在线状态管理

# chat/consumers.py
from channels.db import database_sync_to_async
from django.contrib.auth.models import User

class ChatConsumer(AsyncWebsocketConsumer):
    connected_users = {}
    
    @database_sync_to_async
    def get_user_info(self):
        if self.scope["user"].is_authenticated:
            return {
                'username': self.scope["user"].username,
                'id': self.scope["user"].id
            }
        return None
        
    async def connect(self):
        # ... 之前的连接代码 ...
        
        # 添加用户到在线列表
        user_info = await self.get_user_info()
        if user_info:
            self.connected_users[self.channel_name] = user_info
            
            # 广播用户上线消息
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    'type': 'user_join',
                    'username': user_info['username']
                }
            )
            
    async def disconnect(self, close_code):
        # 移除用户从在线列表
        if self.channel_name in self.connected_users:
            user_info = self.connected_users.pop(self.channel_name)
            
            # 广播用户下线消息
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    'type': 'user_leave',
                    'username': user_info['username']
                }
            )
            
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )
        
    async def user_join(self, event):
        await self.send(text_data=json.dumps({
            'type': 'user_join',
            'username': event['username']
        }))
        
    async def user_leave(self, event):
        await self.send(text_data=json.dumps({
            'type': 'user_leave',
            'username': event['username']
        }))

四、性能优化和最佳实践

  1. Channel Layer优化
# settings.py
CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("localhost", 6379)],
            "capacity": 1500,  # 默认100
            "expiry": 10,  # 默认60
        },
    },
}
  1. 消息批处理
class ChatConsumer(AsyncWebsocketConsumer):
    message_queue = []
    
    async def receive(self, text_data):
        # ... 消息处理代码 ...
        
        self.message_queue.append({
            'type': 'chat_message',
            'message': message,
            'username': username
        })
        
        if len(self.message_queue) >= 10:
            await self.flush_message_queue()
            
    async def flush_message_queue(self):
        if self.message_queue:
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    'type': 'bulk_messages',
                    'messages': self.message_queue
                }
            )
            self.message_queue = []

五、测试

# chat/tests.py
import pytest
from channels.testing import WebsocketCommunicator
from channels.routing import URLRouter
from django.urls import re_path
from .consumers import ChatConsumer

@pytest.mark.asyncio
async def test_chat_consumer():
    application = URLRouter([
        re_path(r'ws/chat/(?P<room_name>\w+)/$', ChatConsumer.as_asgi()),
    ])
    communicator = WebsocketCommunicator(
        application=application,
        path='/ws/chat/testroom/'
    )
    connected, _ = await communicator.connect()
    assert connected
    
    # 测试发送消息
    await communicator.send_json_to({
        'message': 'hello',
        'username': 'testuser'
    })
    
    response = await communicator.receive_json_from()
    assert response['message'] == 'hello'
    assert response['username'] == 'testuser'
    
    await communicator.disconnect()

六、总结与进阶建议

今天我们学习了Django Channels的核心概念和实现方法。重点包括:

  1. WebSocket基础知识
  2. Consumer的编写和配置
  3. 消息路由系统
  4. Channel Layer的使用
  5. 错误处理和重连机制
  6. 性能优化

怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!


http://www.kler.cn/a/458870.html

相关文章:

  • GeoTrust True BusinessID Wildcard
  • Angular Firebase CRUD 项目推荐
  • Python爬取城市天气信息,并存储到csv文件中
  • C++ 设计模式:中介者模式(Mediator Pattern)
  • Gemma2 2B 模型的model.safetensors.index.json文件解析
  • C++设计模式:状态模式(自动售货机)
  • react-native键盘遮盖底部输入框问题修复
  • 对于多个网站的爬虫管理和配置支持
  • 前端处理跨域的几种方式
  • AI 加持下的智能家居行业:变革、挑战与机遇
  • 深度学习-78-大模型量化之Quantization Aware Training量化感知训练QAT
  • LeetCode每日三题(五)双指针
  • 基于PLC的电梯控制系统(论文+源码)
  • 从Huggingface下载的数据集为arrow格式,如何从本地路径读取arrow数据并输出样例
  • Knowledge is power——Digital Electronics
  • pytorch基础之注解的使用--003
  • 「Mac玩转仓颉内测版55」应用篇2 - 使用函数实现更复杂的计算
  • 项目优化性能监控
  • 基于YOLOv10和BYTETracker的多目标追踪系统,实现地铁人流量计数功能(基于复杂场景和密集遮挡条件下)
  • 前端学习DAY29(1688侧边栏)
  • NPM组件包 vant部分版本内嵌挖矿代码
  • 《燕云十六声》d3dcompiler_47.dll缺失怎么解决?
  • 深度学习中的HTTP:从请求到响应的计算机网络交互
  • JVM实战—5.G1垃圾回收器的原理和调优
  • windows 下通过脚本方式实现 类似 Linux keepalived IP 动态绑定效果
  • 有限元分析学习——Anasys Workbanch第一阶段笔记(2)应力奇异及位移结果对比、初步了解单元的基本知识