每天40分玩转Django:Django即时聊天应用实战
Django即时聊天应用实战
一、今日实战内容概览
功能模块 | 核心技术 |
---|---|
聊天室 | Django Channels, WebSocket, Redis |
在线用户列表 | Channel Layers, 实时更新 |
消息持久化 | Django Models, Database |
用户界面 | HTML, CSS, JavaScript |
让我们创建一个完整的即时聊天应用流程图:
二、项目结构
chat_project/
├── chat/
│ ├── __init__.py
│ ├── consumers.py
│ ├── models.py
│ ├── routing.py
│ ├── urls.py
│ ├── views.py
│ └── templates/
│ └── chat/
│ ├── index.html
│ └── room.html
├── static/
│ └── css/
│ └── chat.css
└── chat_project/
├── __init__.py
├── asgi.py
├── settings.py
└── urls.py
三、完整代码实现
1. 模型设计
# chat/models.py
from django.db import models
from django.contrib.auth.models import User
class ChatRoom(models.Model):
name = models.CharField(max_length=100, unique=True)
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.name
class ChatMessage(models.Model):
room = models.ForeignKey(ChatRoom, on_delete=models.CASCADE)
user = models.ForeignKey(User, on_delete=models.CASCADE)
content = models.TextField()
timestamp = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['timestamp']
def __str__(self):
return f'{self.user.username}: {self.content[:50]}'
2. Consumer实现
# chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from .models import ChatRoom, ChatMessage
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
self.user = self.scope['user']
# 加入房间组
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
# 连接建立
await self.accept()
# 获取在线用户列表
await self.update_user_list(True)
# 获取历史消息
await self.send_chat_history()
async def disconnect(self, close_code):
# 离开房间组
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
# 更新在线用户列表
await self.update_user_list(False)
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message_type = text_data_json.get('type', 'chat_message')
if message_type == 'chat_message':
message = text_data_json['message']
# 保存消息到数据库
await self.save_message(message)
# 发送消息到房间组
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'message': message,
'username': self.user.username
}
)
async def chat_message(self, event):
message = event['message']
username = event['username']
# 发送消息到WebSocket
await self.send(text_data=json.dumps({
'type': 'chat_message',
'message': message,
'username': username,
'timestamp': event.get('timestamp', '')
}))
@database_sync_to_async
def save_message(self, message):
room = ChatRoom.objects.get(name=self.room_name)
ChatMessage.objects.create(
room=room,
user=self.user,
content=message
)
@database_sync_to_async
def get_chat_history(self):
room = ChatRoom.objects.get(name=self.room_name)
messages = ChatMessage.objects.filter(room=room).select_related('user')
return [(msg.user.username, msg.content, msg.timestamp.isoformat())
for msg in messages]
async def send_chat_history(self):
history = await self.get_chat_history()
await self.send(text_data=json.dumps({
'type': 'chat_history',
'messages': history
}))
async def update_user_list(self, joined):
# 更新在线用户列表
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'user_list_changed',
'username': self.user.username,
'joined': joined
}
)
async def user_list_changed(self, event):
await self.send(text_data=json.dumps({
'type': 'user_list_changed',
'username': event['username'],
'joined': event['joined']
}))
3. 路由配置
# chat/routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]
4. URLs配置
# chat/urls.py
from django.urls import path
from . import views
app_name = 'chat'
urlpatterns = [
path('', views.index, name='index'),
path('room/<str:room_name>/', views.room, name='room'),
]
5. 视图实现
# chat/views.py
from django.shortcuts import render, redirect
from django.contrib.auth.decorators import login_required
from .models import ChatRoom, ChatMessage
@login_required
def index(request):
rooms = ChatRoom.objects.all()
return render(request, 'chat/index.html', {
'rooms': rooms
})
@login_required
def room(request, room_name):
room, created = ChatRoom.objects.get_or_create(name=room_name)
return render(request, 'chat/room.html', {
'room': room
})
6. 模板实现
<!-- chat/templates/chat/room.html -->
<!DOCTYPE html>
<html>
<head>
<title>Chat Room - {{ room.name }}</title>
<link rel="stylesheet" href="/static/css/chat.css">
</head>
<body>
<div class="chat-container">
<div class="sidebar">
<h3>Online Users</h3>
<div id="online-users"></div>
</div>
<div class="main-content">
<div id="chat-messages"></div>
<div class="chat-input">
<input type="text" id="chat-message-input" placeholder="Type a message...">
<button id="chat-message-submit">Send</button>
</div>
</div>
</div>
<script>
const roomName = {{ room.name|safe }};
const username = {{ request.user.username|safe }};
const chatSocket = new WebSocket(
'ws://' + window.location.host +
'/ws/chat/' + roomName + '/'
);
const onlineUsers = new Set();
const messagesDiv = document.querySelector('#chat-messages');
const onlineUsersDiv = document.querySelector('#online-users');
chatSocket.onmessage = function(e) {
const data = JSON.parse(e.data);
switch(data.type) {
case 'chat_message':
appendMessage(data.username, data.message);
break;
case 'chat_history':
data.messages.forEach(msg => {
appendMessage(msg[0], msg[1], msg[2]);
});
break;
case 'user_list_changed':
if(data.joined) {
onlineUsers.add(data.username);
} else {
onlineUsers.delete(data.username);
}
updateOnlineUsers();
break;
}
};
function appendMessage(username, message, timestamp = null) {
const messageDiv = document.createElement('div');
messageDiv.className = 'message';
const time = timestamp ? new Date(timestamp).toLocaleTimeString() : new Date().toLocaleTimeString();
messageDiv.innerHTML = `
<span class="username">${username}</span>
<span class="time">${time}</span>
<p>${message}</p>
`;
messagesDiv.appendChild(messageDiv);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
function updateOnlineUsers() {
onlineUsersDiv.innerHTML = Array.from(onlineUsers)
.map(user => `<div class="user">${user}</div>`)
.join('');
}
document.querySelector('#chat-message-input').focus();
document.querySelector('#chat-message-input').onkeyup = function(e) {
if (e.keyCode === 13) {
document.querySelector('#chat-message-submit').click();
}
};
document.querySelector('#chat-message-submit').onclick = function(e) {
const messageInputDom = document.querySelector('#chat-message-input');
const message = messageInputDom.value;
if(message) {
chatSocket.send(JSON.stringify({
'type': 'chat_message',
'message': message
}));
messageInputDom.value = '';
}
};
</script>
</body>
</html>
7. 样式实现
/* static/css/chat.css */
.chat-container {
display: flex;
height: 100vh;
padding: 20px;
box-sizing: border-box;
}
.sidebar {
width: 200px;
background: #f5f5f5;
padding: 15px;
margin-right: 20px;
border-radius: 5px;
}
.main-content {
flex: 1;
display: flex;
flex-direction: column;
}
#chat-messages {
flex: 1;
overflow-y: auto;
padding: 15px;
background: #fff;
border: 1px solid #ddd;
border-radius: 5px;
margin-bottom: 15px;
}
.chat-input {
display: flex;
gap: 10px;
}
#chat-message-input {
flex: 1;
padding: 10px;
border: 1px solid #ddd;
border-radius: 5px;
}
#chat-message-submit {
padding: 10px 20px;
background: #007bff;
color: white;
border: none;
border-radius: 5px;
cursor: pointer;
}
.message {
margin-bottom: 15px;
padding: 10px;
background: #f9f9f9;
border-radius: 5px;
}
.message .username {
font-weight: bold;
color: #007bff;
margin-right: 10px;
}
.message .time {
color: #666;
font-size: 0.8em;
}
.user {
padding: 5px;
margin: 5px 0;
background: #fff;
border-radius: 3px;
}
四、性能优化
- 消息分页加载:
@database_sync_to_async
def get_chat_history(self):
room = ChatRoom.objects.get(name=self.room_name)
messages = ChatMessage.objects.filter(room=room)\
.select_related('user')\
.order_by('-timestamp')[:50]\
.reverse()
return list(messages)
- Redis连接池配置:
# settings.py
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [("localhost", 6379)],
"capacity": 1500,
"expiry": 10,
},
},
}
- 消息批处理:
class ChatConsumer(AsyncWebsocketConsumer):
message_buffer = []
async def receive(self, text_data):
# ... 现有代码 ...
self.message_buffer.append(message)
if len(self.message_buffer) >= 10:
await self.flush_messages()
async def flush_messages(self):
if self.message_buffer:
messages = self.message_buffer[:]
self.message_buffer = []
# 批量保存消息
await self.save_messages(messages)
五、单元测试
# chat/tests.py
import pytest
from channels.testing import WebsocketCommunicator
from channels.routing import URLRouter
from django.contrib.auth.models import User
from chat.routing import websocket_urlpatterns
from chat.models import ChatRoom, ChatMessage
@pytest.mark.asyncio
async def test_chat_consumer():
# 创建测试用户和房
user = await database_sync_to_async(User.objects.create_user)(
username='testuser',
password='testpass'
)
room = await database_sync_to_async(ChatRoom.objects.create)(
name='testroom'
)
# 创建WebSocket通讯器
application = URLRouter(websocket_urlpatterns)
communicator = WebsocketCommunicator(
application=application,
path=f'/ws/chat/testroom/'
)
# 测试连接
connected, _ = await communicator.connect()
assert connected
# 测试发送消息
await communicator.send_json_to({
'type': 'chat_message',
'message': 'Hello, World!'
})
# 测试接收消息
response = await communicator.receive_json_from()
assert response['type'] == 'chat_message'
assert response['message'] == 'Hello, World!'
# 测试断开连接
await communicator.disconnect()
# 验证消息是否保存到数据库
messages = await database_sync_to_async(ChatMessage.objects.filter)(
room=room
).count()
assert messages == 1
@pytest.mark.asyncio
async def test_user_list_updates():
# 创建两个测试用户
user1 = await database_sync_to_async(User.objects.create_user)(
username='user1',
password='testpass'
)
user2 = await database_sync_to_async(User.objects.create_user)(
username='user2',
password='testpass'
)
# 创建两个通讯器
application = URLRouter(websocket_urlpatterns)
communicator1 = WebsocketCommunicator(
application=application,
path='/ws/chat/testroom/'
)
communicator2 = WebsocketCommunicator(
application=application,
path='/ws/chat/testroom/'
)
# 测试用户1连接
await communicator1.connect()
response1 = await communicator1.receive_json_from()
assert response1['type'] == 'user_list_changed'
assert response1['username'] == 'user1'
assert response1['joined'] == True
# 测试用户2连接
await communicator2.connect()
response2 = await communicator2.receive_json_from()
assert response2['type'] == 'user_list_changed'
assert response2['username'] == 'user2'
assert response2['joined'] == True
# 测试断开连接更新用户列表
await communicator1.disconnect()
response2 = await communicator2.receive_json_from()
assert response2['type'] == 'user_list_changed'
assert response2['username'] == 'user1'
assert response2['joined'] == False
await communicator2.disconnect()
六、部署注意事项
- ASGI服务器配置
# asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from chat.routing import websocket_urlpatterns
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'chat_project.settings')
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(
URLRouter(
websocket_urlpatterns
)
),
})
- Daphne服务器启动
daphne -b 0.0.0.0 -p 8000 chat_project.asgi:application
- Nginx配置
upstream channels-backend {
server localhost:8000;
}
server {
listen 80;
server_name example.com;
location / {
proxy_pass http://channels-backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
七、进阶功能建议
- 添加私聊功能:
- 创建私聊房间
- 一对一消息发送
- 未读消息提醒
- 消息富文本支持:
- Markdown格式
- 表情支持
- 图片上传
- 房间管理功能:
- 房间创建/删除
- 成员管理
- 权限控制
- 消息搜索功能:
- 全文搜索
- 按时间筛选
- 按用户筛选
八、总结
今天我们实现了一个完整的即时聊天应用,包含以下核心功能:
- 实时消息发送和接收
- 在线用户列表管理
- 消息持久化存储
- 聊天历史记录
- 用户认证和授权
实现过程中的关键点:
- WebSocket连接管理
- Channel Layer消息广播
- 数据库操作异步化
- 前端实时更新
- 性能优化
建议练习:
- 添加消息编辑和删除功能
- 实现文件上传和共享
- 添加用户输入状态提示
- 实现消息已读确认
- 添加群组管理功能
通过这个项目,你应该更好地理解了Django Channels的工作原理和实时应用的开发流程。
怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!