当前位置: 首页 > article >正文

基于Pycharm与数据库的新闻管理系统(2)Redis

pip3 install redis

1.创建 Redis 连接池

文件地址:db/redis_db.py

代码尝试连接到本地运行的 Redis 服务器,并设置数据库为 1,最大连接数为 20。

如果连接过程中出现任何异常,它会捕获异常并打印错误信息。

import redis
try:
    pool = redis.ConnectionPool(
        host='localhost',
        port=6379,
        db=1,
        max_connections=20
    )
except Exception as e:
    print(e)

2.查找用于缓存的记录

文件地址:db/news_dao.py

用 sql 语言在 Navicat 中寻找要作为缓存的记录,

目的是将审批通过的信息,存进 Redis 数据库中。

    #查找用于缓存的记录
    def search_cache(self, id):
        # 创建异常
        try:
            # 获得连接项
            conn = pool.get_connection()
            # 获得游标
            cursor = conn.cursor()
            # 创建sql
            sql = """
               SELECT n.title,u.username,t.type,n.content_id,n.is_top,n.create_time
                FROM t_news n join t_type t on n.type_id=t.id
                join t_user u on n.editor_id=u.id
                where n.id=%s
            """
            # 执行sql
            cursor.execute(sql, [id])
            # 获得查询结果
            result = cursor.fetchone()
            return result
        # 返回获得结果
        except Exception as e:
            print(e)
        finally:
            if "conn" in dir():
                conn.close()

3.操作Redis数据库

3.1 创建操作类

文件地址:db/redis_news_dao.py

创建一个 Python 类 RedisNewsDao,它包含两个方法:insertdelete

分别用于添加与删除 Redis 数据库中的新闻数据。

3.1.1 添加数据

from db.redis_db import pool
import redis
class RedisNewsDao:
    #添加数据
    def insert(self,id,title,username,type,content,is_top,create_time):
        conn = redis.Redis(
            connection_pool=pool,
        )
        try:
            #向redis中存放数据
            conn.hmset(id,{
                'title':title,
                'author':username,
                'type':type,
                'content':content,
                'is_top':is_top,
                'create_time':create_time
            })
            #如果是今日热文则保留一天后销毁
            if is_top == 0 :
                conn.expire(id,24*60*60)
        except Exception as e:
            print(e)
        finally:
            del conn
    #删除数据
    def delete(self,id):
        conn = redis.Redis(
            connection_pool=pool
        )
        try:
            conn.delete(id)
        except Exception as e:
            print(e)
        finally:
            del conn

3.1.2 删除数据 

from db.redis_db import pool
import redis
class RedisNewsDao:
    #删除数据
    def delete(self,id):
        conn = redis.Redis(
            connection_pool=pool
        )
        try:
            conn.delete(id)
        except Exception as e:
            print(e)
        finally:
            del conn

3.2 创建业务类

文件地址:service/news_service.py

from db.news_dao import NewsDao
from db.redis_news_dao import RedisNewsDao
class NewsService:
    #实例化新闻Dao对象
    __news_dao = NewsDao()
    __redis_news_dao = RedisNewsDao()
    #查找用户管理中的记录
    def search_cache(self,id):
        result = self.__news_dao.search_cache(id)
        return result
    #向Redis中保存缓存的新闻
    def cache_news(self,id,title,username,type,content,is_top,create_time):
        self.__redis_news_dao.insert(id,title,username,type,content,is_top,create_time)
    #删除缓存新闻
    def delete_cache(self,id):
        self.__redis_news_dao.delete(id)

4.更改Navicat与Redis数据库 

4.1 创建操作类

# 删除新闻
def delete_by_id(self, id):
    try:
        conn = pool.get_connection()
        conn.start_transaction()
        cursor = conn.cursor()
        sql = "delete from t_news where id=%s"
        cursor.execute(sql, [id])
        conn.commit()
    except Exception as e:
        print(e)
        if "conn" in dir():
            conn.rollback()
    finally:
        if "conn" in dir():
            conn.close()

# 添加新闻
def insert_news(self, title, editor_id, type_id, content_id, is_top):
    try:
        conn = pool.get_connection()
        conn.start_transaction()
        cursor = conn.cursor()
        sql = """
            insert into t_news(title,editor_id,type_id,content_id,is_top,state) 
            values (%s,%s,%s,%s,%s,%s)
        """
        cursor.execute(sql, (title, editor_id, type_id, content_id, is_top, "待审批"))
        conn.commit()
    except Exception as e:
        print(e)
        if "conn" in dir():
            conn.rollback()
    finally:
        if "conn" in dir():
            conn.close()

# 查找用于缓存的记录
def search_cache(self, id):
    # 创建异常
    try:
        # 获得连接项
        conn = pool.get_connection()
        # 获得游标
        cursor = conn.cursor()
        # 创建sql
        sql = """
           SELECT n.title,u.username,t.type,n.content_id,n.is_top,n.create_time
            FROM t_news n join t_type t on n.type_id=t.id
            join t_user u on n.editor_id=u.id
            where n.id=%s
        """
        # 执行sql
        cursor.execute(sql, [id])
        # 获得查询结果
        result = cursor.fetchone()
        return result
    # 返回获得结果
    except Exception as e:
        print(e)
    finally:
        if "conn" in dir():
            conn.close()

# 修改id查找新闻
def search_by_id(self, id):
    # 创建异常
    try:
        # 获得连接项
        conn = pool.get_connection()
        # 获得游标
        cursor = conn.cursor()
        # 创建sql
        sql = """
           SELECT n.title,t.type,n.is_top
            FROM t_news n join t_type t on n.type_id=t.id
            where n.id=%s
        """
        # 执行sql
        cursor.execute(sql, [id])
        # 获得查询结果
        result = cursor.fetchone()
        return result
    # 返回获得结果
    except Exception as e:
        print(e)
    finally:
        if "conn" in dir():
            conn.close()

# 更新修改新闻
def update(self, id, title, type_id, content_id, is_top):
    try:
        conn = pool.get_connection()
        conn.start_transaction()
        cursor = conn.cursor()
        sql = "update t_news set title=%s,type_id=%s,content_id=%s,is_top=%s,state=%s,update_time=now() where id=%s"
        cursor.execute(sql, (title, type_id, content_id, is_top, "待审批", id))
        conn.commit()
    except Exception as e:
        print(e)
        if "conn" in dir():
            conn.rollback()
    finally:
        if "conn" in dir():
            conn.close()

4.2  创建操作类

#删除新闻
def delete_news(self,id):
    self.__news_dao.delete_by_id(id)
#添加新闻
def insert_news(self,title,editor_id,type_id,content_id,is_top):
    self.__news_dao.insert_news(title, editor_id, type_id, content_id, is_top)
#查找用户管理中的记录
def search_cache(self,id):
    result = self.__news_dao.search_cache(id)
    return result
#向Redis中保存缓存的新闻
def cache_news(self,id,title,username,type,content,is_top,create_time):
    self.__redis_news_dao.insert(id,title,username,type,content,is_top,create_time)
#删除缓存新闻
def delete_cache(self,id):
    self.__redis_news_dao.delete(id)
#根据id查询新闻信息
def search_by_id(self,id):
    result = self.__news_dao.search_by_id(id)
    return result
#更新修改新闻信息
def update(self,id,title,type_id,content_id,is_top):
    self.__news_dao.update(id,title,type_id,content_id,is_top)
    self.delete_cache(id)

5.实现Redis操作

文件地址:app_py.py

5.1 添加数据

opt = input("\n\t请输入操作编号")  # 等待控制台输入
if opt == "prev" and page > 1:
    page += -1
elif opt == "next" and page < count_page:
    page += 1
elif opt == "back":
    break
elif int(opt) >= 1 and int(opt) <= 5:  # 由于控制台获得的是字符串类型,需要转换成数字类型进行比较
    # 获得新闻id值
    news_id = result[int(opt) - 1][0]  # 获得对应行数及列
    # 调用news_service的审批函数
    __news_service.update_unreview_news(news_id)
    # 新闻缓存 👇
    result = __news_service.search_cache(news_id)
    title = result[0]
    username = result[1]
    type = result[2]
    content_id = result[3]
    # 查询新闻正文
    content = "100"
    is_top = result[4]
    create_time = str(result[5])
    __news_service.cache_news(news_id, title, username, type, content, is_top, create_time)
              👆

5.2 删除数据

opt = input("\n\t请输入操作编号")  # 等待控制台输入
if opt == "prev" and page > 1:
    page += -1
elif opt == "next" and page < count_page:
    page += 1
elif opt == "back":
    break👇
elif int(opt) >=1 and int(opt) <=5 :
    news_id = result[int(opt) - 1][0]
    result = __news_service.search_by_id(news_id)
    title = result[0]
    type = result[1]
    is_top = result[2]
    print("\n\t新闻原标题:%s"%(title))
    new_title = input("\n\t新标题")
    print("\n\t新闻原类型:%s"%(type))
    result = __type_service.search_list()
    for index in range(len(result)):
        t = result[index]
        print(Fore.LIGHTBLUE_EX,"\n\t%d.%s"%(index+1,t[1]))
    print(Style.RESET_ALL)
    opt = input("\n\t类型编号:")
    new_type = result[int(opt) - 1][0]
    content_id = 10
    print("原置顶级别%s"%(is_top))
    new_is_top = input("\n\t置顶级别(0-5):")
    is_commit = input("\n\t是否提交?(Y/N)")
    if is_commit == 'y' or is_commit == 'Y':
        __news_service.update(news_id,new_title, new_type, content_id, new_is_top)
        print("\n\t保存成功(3秒自动返回)")
        time.sleep(3)
           👆

5.3 更改数据

print(Fore.LIGHTGREEN_EX," \n\t1.发表新闻")
print(Fore.LIGHTGREEN_EX, "\n\t2.编辑新闻")
print(Fore.LIGHTGREEN_EX, "\n\t3.退出登录")
print(Fore.LIGHTGREEN_EX, "\n\t4.退出系统")
print(Style.RESET_ALL)
opt = input("\n\t输入操作编号")
if opt == "1":...
elif opt == "2":
page = 1
while True:
    os.system("cls")
    count_page = __news_service.search_count_page()
    result = __news_service.search_list(page)
    for index in range(len(result)):
        new = result[index]
        print(Fore.LIGHTBLUE_EX, "\n\t%d\t%s\t%s\t%s\t" % (index + 1, new[1], new[2], new[3]))
        # 输入在新闻列表中一共有多少页,当前多少页
    print(Fore.LIGHTBLUE_EX, "\n\t%d/%d" % (page, count_page))
    print(Fore.LIGHTBLUE_EX, "\n\t------------------")
    print(Fore.LIGHTBLUE_EX, "\n\tprev.上一页")
    print(Fore.LIGHTBLUE_EX, "\n\tnext.下一页")
    print(Fore.LIGHTBLUE_EX, "\n\tback.返回上一层")
    print(Style.RESET_ALL)
    opt = input("\n\t请输入操作编号")  # 等待控制台输入
    if opt == "prev" and page > 1:
        page += -1
    elif opt == "next" and page < count_page:
        page += 1
    elif opt == "back":
        break
    elif int(opt) >= 1 and int(opt) <= 5:
        news_id = result[int(opt) - 1][0]
        result = __news_service.search_by_id(news_id)
        title = result[0]
        type = result[1]
        is_top = result[2]
        print("\n\t新闻原标题:%s" % (title))
        new_title = input("\n\t新标题")
        print("\n\t新闻原类型:%s" % (type))
        result = __type_service.search_list()
        for index in range(len(result)):
            t = result[index]
            print(Fore.LIGHTBLUE_EX, "\n\t%d.%s" % (index + 1, t[1]))
        print(Style.RESET_ALL)
        opt = input("\n\t类型编号:")
        new_type = result[int(opt) - 1][0]
        content_id = 10
        print("原置顶级别%s" % (is_top))
        new_is_top = input("\n\t置顶级别(0-5):")
        is_commit = input("\n\t是否提交?(Y/N)")
        if is_commit == 'y' or is_commit == 'Y':
            __news_service.update(news_id, new_title, new_type, content_id, new_is_top)
            print("\n\t保存成功(3秒自动返回)")
            time.sleep(3)

http://www.kler.cn/a/457216.html

相关文章:

  • LockSupport的源码实现原理(一)
  • C++ 设计模式:享元模式(Flyweight Pattern)
  • PHP 中的魔术常量
  • vscode-QT环境配置
  • 常用的linux命令介绍
  • 在Linux的世界中怎么玩转定时器任务
  • 内网渗透思路amp;相关服务爆破以及提权
  • SpringBoot使用外置的Servlet容器(详细步骤)
  • CH340系列芯片驱动电路·CH340系列芯片驱动!!!
  • 鸿蒙next之如何实现防截屏功能
  • 在 CentOS 系统上安装 ClickHouse
  • Es搭建——单节点——Linux
  • 【FPGA】ISE13.4操作手册,新建工程示例
  • 嵌入式学习-硬件基础-Day02
  • 深入理解 Cookie 和 Session 在 Java Web 中的应用
  • Unity 实现Canvas显示3D物体
  • 18_HTML5 Web IndexedDB 数据库 --[HTML5 API 学习之旅]
  • 神经网络-Inception
  • vscode vue文件 点击ctrl没有跳转到有@路径的自定义组件
  • React Diffing 算法完整指南
  • 精读DeepSeek v3技术文档的心得感悟
  • ensp 关于ARRP 的讲解 配置
  • 【WSL】Ubuntu 24.04 安装配置docker
  • Lua语言的计算机基础
  • 基于aspose.words组件的word bytes转pdf bytes,去除水印和解决linux中文乱码问题
  • EsChatPro 接入国内 DeepSeek 大模型