每天40分玩转Django:Django Channels
Django Channels
一、今日学习内容概览
知识模块 | 重点内容 | 难度系数 |
---|---|---|
WebSocket基础 | WebSocket协议、连接建立、消息传输 | ★★★★☆ |
消息路由 | URL路由配置、消费者编写、消息处理 | ★★★★☆ |
Channels配置 | 项目配置、ASGI应用、Channel Layers | ★★★☆☆ |
二、WebSocket基础
1. 环境配置
首先安装必要的包:
pip install channels
pip install channels-redis
redis-server # 确保Redis服务已启动
2. 项目配置
# settings.py
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'channels', # 添加channels
'chat', # 示例应用
]
# 将ASGI应用设置为默认
ASGI_APPLICATION = 'myproject.asgi.application'
# Channel层配置
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [('127.0.0.1', 6379)],
},
},
}
# asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from chat.routing import websocket_urlpatterns
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(
URLRouter(
websocket_urlpatterns
)
),
})
让我们创建一个流程图来说明WebSocket连接的建立过程:
三、实现实时聊天室
1. 创建Consumer
# chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
# 将channel加入群组
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
# 将channel从群组中移除
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
username = text_data_json.get('username', 'Anonymous')
# 向群组发送消息
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'message': message,
'username': username
}
)
async def chat_message(self, event):
message = event['message']
username = event['username']
# 向WebSocket发送消息
await self.send(text_data=json.dumps({
'message': message,
'username': username
}))
2. 配置路由
# chat/routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]
3. 创建视图和模板
# chat/views.py
from django.shortcuts import render
def index(request):
return render(request, 'chat/index.html')
def room(request, room_name):
return render(request, 'chat/room.html', {
'room_name': room_name
})
# chat/urls.py
from django.urls import path
from . import views
urlpatterns = [
path('', views.index, name='index'),
path('<str:room_name>/', views.room, name='room'),
]
<!-- chat/templates/chat/room.html -->
<!DOCTYPE html>
<html>
<head>
<title>Chat Room</title>
<style>
#chat-messages {
height: 300px;
overflow-y: scroll;
border: 1px solid #ccc;
padding: 10px;
margin-bottom: 10px;
}
</style>
</head>
<body>
<div id="chat-messages"></div>
<input type="text" id="username" placeholder="Your name">
<input type="text" id="chat-message-input" placeholder="Message">
<button id="chat-message-submit">Send</button>
<script>
const roomName = {{ room_name|safe }};
const chatSocket = new WebSocket(
'ws://' + window.location.host + '/ws/chat/' + roomName + '/'
);
const messagesDiv = document.querySelector('#chat-messages');
const messageInput = document.querySelector('#chat-message-input');
const usernameInput = document.querySelector('#username');
const submitButton = document.querySelector('#chat-message-submit');
chatSocket.onmessage = function(e) {
const data = JSON.parse(e.data);
const messageElement = document.createElement('div');
messageElement.textContent = `${data.username}: ${data.message}`;
messagesDiv.appendChild(messageElement);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};
chatSocket.onclose = function(e) {
console.error('Chat socket closed unexpectedly');
};
messageInput.focus();
messageInput.onkeyup = function(e) {
if (e.keyCode === 13) {
submitButton.click();
}
};
submitButton.onclick = function(e) {
const message = messageInput.value;
const username = usernameInput.value || 'Anonymous';
if (message) {
chatSocket.send(JSON.stringify({
'message': message,
'username': username
}));
messageInput.value = '';
}
};
</script>
</body>
</html>
4. 增加错误处理和重连机制
# chat/consumers.py
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
try:
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
# 发送连接成功消息
await self.send(text_data=json.dumps({
'type': 'connection_established',
'message': 'Connected to chat room'
}))
except Exception as e:
await self.close(code=4000)
async def receive(self, text_data):
try:
text_data_json = json.loads(text_data)
message = text_data_json['message']
username = text_data_json.get('username', 'Anonymous')
# 消息验证
if not message or len(message) > 1000:
raise ValueError('Invalid message')
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'message': message,
'username': username
}
)
except json.JSONDecodeError:
await self.send(text_data=json.dumps({
'type': 'error',
'message': 'Invalid JSON format'
}))
except ValueError as e:
await self.send(text_data=json.dumps({
'type': 'error',
'message': str(e)
}))
5. 添加用户在线状态管理
# chat/consumers.py
from channels.db import database_sync_to_async
from django.contrib.auth.models import User
class ChatConsumer(AsyncWebsocketConsumer):
connected_users = {}
@database_sync_to_async
def get_user_info(self):
if self.scope["user"].is_authenticated:
return {
'username': self.scope["user"].username,
'id': self.scope["user"].id
}
return None
async def connect(self):
# ... 之前的连接代码 ...
# 添加用户到在线列表
user_info = await self.get_user_info()
if user_info:
self.connected_users[self.channel_name] = user_info
# 广播用户上线消息
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'user_join',
'username': user_info['username']
}
)
async def disconnect(self, close_code):
# 移除用户从在线列表
if self.channel_name in self.connected_users:
user_info = self.connected_users.pop(self.channel_name)
# 广播用户下线消息
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'user_leave',
'username': user_info['username']
}
)
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def user_join(self, event):
await self.send(text_data=json.dumps({
'type': 'user_join',
'username': event['username']
}))
async def user_leave(self, event):
await self.send(text_data=json.dumps({
'type': 'user_leave',
'username': event['username']
}))
四、性能优化和最佳实践
- Channel Layer优化
# settings.py
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [("localhost", 6379)],
"capacity": 1500, # 默认100
"expiry": 10, # 默认60
},
},
}
- 消息批处理
class ChatConsumer(AsyncWebsocketConsumer):
message_queue = []
async def receive(self, text_data):
# ... 消息处理代码 ...
self.message_queue.append({
'type': 'chat_message',
'message': message,
'username': username
})
if len(self.message_queue) >= 10:
await self.flush_message_queue()
async def flush_message_queue(self):
if self.message_queue:
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'bulk_messages',
'messages': self.message_queue
}
)
self.message_queue = []
五、测试
# chat/tests.py
import pytest
from channels.testing import WebsocketCommunicator
from channels.routing import URLRouter
from django.urls import re_path
from .consumers import ChatConsumer
@pytest.mark.asyncio
async def test_chat_consumer():
application = URLRouter([
re_path(r'ws/chat/(?P<room_name>\w+)/$', ChatConsumer.as_asgi()),
])
communicator = WebsocketCommunicator(
application=application,
path='/ws/chat/testroom/'
)
connected, _ = await communicator.connect()
assert connected
# 测试发送消息
await communicator.send_json_to({
'message': 'hello',
'username': 'testuser'
})
response = await communicator.receive_json_from()
assert response['message'] == 'hello'
assert response['username'] == 'testuser'
await communicator.disconnect()
六、总结与进阶建议
今天我们学习了Django Channels的核心概念和实现方法。重点包括:
- WebSocket基础知识
- Consumer的编写和配置
- 消息路由系统
- Channel Layer的使用
- 错误处理和重连机制
- 性能优化
怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!