当前位置: 首页 > article >正文

Python基础语法(17多线程线程锁单例模式)

  Python基础语法文章导航:

  1. Python基础(01初识数据类型&变量)
  2. Python基础(02条件&循环语句)
  3. Python基础(03字符串格式化&运算符&进制&编码)
  4. Python基础(04 基础练习题)
  5. Python数据类型(day05整型&布尔类型&字符串类型)
  6. Python数据类型(06列表&元组)
  7. Python数据类型(07集合&字典&浮点型&None)
  8. Python文件操作01(自动化测试文件相关操作)
  9. Python函数入门(08函数定义&参数&返回值)

  10. Python文件操作02(自动化测试文件相关操作)

  11. Python函数(10生成器&内置函数&推导式)

  12. Python函数(11自定义模块&第三方模块&内置模块)

  13. Python函数(12时间处理&正则表达式)

  14. Python函数(13面向对象)

  15. Python面向对象(15成员&成员修饰符)

  16. Python函数(16进程和线程)

目录

一.多线程开发

1. t.start(),当前线程准备就绪(等待CPU调度,具体时间是由CPU来决定)。

2. t.join(),等待当前线程的任务执行完毕后再向下继续执行。

3. t.setDaemon(布尔值) ,守护线程(必须放在start之前)

二.线程安全

三.线程锁

1.Lock,同步锁

2.RLock,递归锁

四.死锁

五.线程池

六.单例模式(扩展)


一.多线程开发

import threading

def task(arg):
	pass


# 创建一个Thread对象(线程),并封装线程被CPU调度时应该执行的任务和相关参数。
t = threading.Thread(target=task,args=('xxx',))
# 线程准备就绪(等待CPU调度),代码继续向下执行。
t.start()

print("继续执行...") # 主线程执行完所有代码,不结束(等待子线程)

线程的常见方法:

1. t.start(),当前线程准备就绪(等待CPU调度,具体时间是由CPU来决定)。

import threading

loop = 10000000
number = 0

def _add(count):
    global number
    for i in range(count):
        number += 1

t = threading.Thread(target=_add,args=(loop,))
t.start()

print(number)

2. t.join(),等待当前线程的任务执行完毕后再向下继续执行。

import threading

number = 0

def _add():
    global number
    for i in range(10000000):
        number += 1

t = threading.Thread(target=_add)
t.start()

t.join() # 主线程等待中...

print(number)
import threading
number = 0
def _add():
    global number
    for i in range(10000000):
        number += 1
def _sub():
    global number
    for i in range(10000000):
        number -= 1
t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)
t1.start()
t1.join()  # t1线程执行完毕,才继续往后走
t2.start()
t2.join()  # t2线程执行完毕,才继续往后走
print(number)
import threading
loop = 10000000
number = 0
def _add(count):
    global number
    for i in range(count):
        number += 1
def _sub(count):
    global number
    for i in range(count):
        number -= 1
t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()
t1.join()  # t1线程执行完毕,才继续往后走
t2.join()  # t2线程执行完毕,才继续往后走
print(number)

3. t.setDaemon(布尔值) ,守护线程(必须放在start之前)

  • t.setDaemon(True),设置为守护线程,主线程执行完毕后,子线程也自动关闭。

  • t.setDaemon(False),设置为非守护线程,主线程等待子线程,子线程执行完毕后,主线程才结束。(默认)

import threading
import time

def task(arg):
    time.sleep(5)
    print('任务')

t = threading.Thread(target=task, args=(11,))
t.setDaemon(True) # True/False
t.start()

print('END')

 线程名称的设置和获取

import threading
def task(arg):
    # 获取当前执行此代码的线程
    name = threading.current_thread().getName()
    print(name)
for i in range(10):
    t = threading.Thread(target=task, args=(11,))
    t.setName('日魔-{}'.format(i))
    t.start()

 自定义线程类,直接将线程需要做的事写到run方法中。

import threading
class MyThread(threading.Thread):
    def run(self):
        print('执行此线程', self._args)
t = MyThread(args=(100,))
t.start()
import requests
import threading


class DouYinThread(threading.Thread):
    def run(self):
        file_name, video_url = self._args
        res = requests.get(video_url)
        with open(file_name, mode='wb') as f:
            f.write(res.content)


url_list = [
    ("东北F4模仿秀.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0300f570000bvbmace0gvch7lo53oog"),
    ("卡特扣篮.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f3e0000bv52fpn5t6p007e34q1g"),
    ("罗斯mvp.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f240000buuer5aa4tij4gv6ajqg")
]
for item in url_list:
    t = DouYinThread(args=(item[0], item[1]))
    t.start()

二.线程安全

一个进程中可以有多个线程,且线程共享所有进程中的资源。

多个线程同时去操作一个"东西",可能会存在数据混乱的情况,例如:

示例1:

import threading

loop = 10000000
number = 0


def _add(count):
    global number
    for i in range(count):
        number += 1


def _sub(count):
    global number
    for i in range(count):
        number -= 1


t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()

t1.join()  # t1线程执行完毕,才继续往后走
t2.join()  # t2线程执行完毕,才继续往后走

print(number) #-4807368
import threading

lock_object = threading.RLock()

loop = 10000000
number = 0


def _add(count):
    lock_object.acquire() # 加锁
    global number
    for i in range(count):
        number += 1
    lock_object.release() # 释放锁


def _sub(count):
    lock_object.acquire() # 申请锁(等待)
    global number
    for i in range(count):
        number -= 1
    lock_object.release() # 释放锁


t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()

t1.join()  # t1线程执行完毕,才继续往后走
t2.join()  # t2线程执行完毕,才继续往后走

print(number)  3import threading

lock_object = threading.RLock()

loop = 10000000
number = 0


def _add(count):
    lock_object.acquire() # 加锁
    global number
    for i in range(count):
        number += 1
    lock_object.release() # 释放锁


def _sub(count):
    lock_object.acquire() # 申请锁(等待)
    global number
    for i in range(count):
        number -= 1
    lock_object.release() # 释放锁


t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()

t1.join()  # t1线程执行完毕,才继续往后走
t2.join()  # t2线程执行完毕,才继续往后走

print(number)  #0

示例2:

import threading
num = 0

def task():
    global num
    for i in range(1000000):
        num += 1
    print(num)

for i in range(2):
    t = threading.Thread(target=task)
    t.start()
# 805594
# 1072361
import threading

num = 0
lock_object = threading.RLock()


def task():
    print("开始")
    lock_object.acquire()  # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
    global num
    for i in range(1000000):
        num += 1
    lock_object.release()  # 线程出去,并解开锁,其他线程就可以进入并执行了
    print(num)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()
# 开始
# 开始
# 1000000
# 2000000
import threading

num = 0
lock_object = threading.RLock()


def task():
    print("开始")
    with lock_object: # 基于上下文管理,内部自动执行 acquire 和 release
        global num
        for i in range(1000000):
            num += 1
    print(num)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()
    
# 开始
# 开始
# 1000000
# 2000000

三.线程锁

在程序中如果想要自己手动加锁,一般有两种:Lock 和 RLock。

1.Lock,同步锁

import threading

num = 0
lock_object = threading.Lock()


def task():
    print("开始")
    lock_object.acquire()  # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
    global num
    for i in range(1000000):
        num += 1
    lock_object.release()  # 线程出去,并解开锁,其他线程就可以进入并执行了

    print(num)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()
    
# 开始
# 开始
# 1000000
# 2000000

2.RLock,递归锁

import threading

num = 0
lock_object = threading.RLock()


def task():
    print("开始")
    lock_object.acquire()  # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
    global num
    for i in range(1000000):
        num += 1
    lock_object.release()  # 线程出去,并解开锁,其他线程就可以进入并执行了
    print(num)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()  
# 开始
# 开始
# 1000000
# 2000000

RLock支持多次申请锁和多次释放;Lock不支持。例如:

import threading
import time

lock_object = threading.RLock()


def task():
    print("开始")
    lock_object.acquire()
    lock_object.acquire()
    print(123)
    lock_object.release()
    lock_object.release()


for i in range(3):
    t = threading.Thread(target=task)
    t.start()
import threading
lock = threading.RLock()

# 程序员A开发了一个函数,函数可以被其他开发者调用,内部需要基于锁保证数据安全。
def func():
	with lock:
		pass
        
# 程序员B开发了一个函数,可以直接调用这个函数。
def run():
    print("其他功能")
    func() # 调用程序员A写的func函数,内部用到了锁。
    print("其他功能")
    
# 程序员C开发了一个函数,自己需要加锁,同时也需要调用func函数。
def process():
    with lock:
		print("其他功能")
        func() # ----------------> 此时就会出现多次锁的情况,只有RLock支持(Lock不支持)。
		print("其他功能")

四.死锁

死锁,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象。

import threading

num = 0
lock_object = threading.Lock()


def task():
    print("开始")
    lock_object.acquire()  # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
    lock_object.acquire()  # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
    global num
    for i in range(1000000):
        num += 1
    lock_object.release()  # 线程出去,并解开锁,其他线程就可以进入并执行了
    lock_object.release()  # 线程出去,并解开锁,其他线程就可以进入并执行了
    
    print(num)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()
import threading
import time 

lock_1 = threading.Lock()
lock_2 = threading.Lock()


def task1():
    lock_1.acquire()
    time.sleep(1)
    lock_2.acquire()
    print(11)
    lock_2.release()
    print(111)
    lock_1.release()
    print(1111)


def task2():
    lock_2.acquire()
    time.sleep(1)
    lock_1.acquire()
    print(22)
    lock_1.release()
    print(222)
    lock_2.release()
    print(2222)


t1 = threading.Thread(target=task1)
t1.start()

t2 = threading.Thread(target=task2)
t2.start()

五.线程池

Python3中官方才正式提供线程池。

线程不是开的越多越好,开的多了可能会导致系统的性能更低了,例如:如下的代码是不推荐在项目开发中编写。

不建议:无限制的创建线程。

import threading


def task(video_url):
    pass

url_list = ["www.xxxx-{}.com".format(i) for i in range(30000)]

for url in url_list:
    t = threading.Thread(target=task, args=(url,))
    t.start()

# 这种每次都创建一个线程去操作,创建任务的太多,线程就会特别多,可能效率反倒降低了。

建议:使用线程池

示例1:

import time
from concurrent.futures import ThreadPoolExecutor

# pool = ThreadPoolExecutor(100)
# pool.submit(函数名,参数1,参数2,参数...)


def task(video_url,num):
    print("开始执行任务", video_url)
    time.sleep(5)

# 创建线程池,最多维护10个线程。
pool = ThreadPoolExecutor(10)

url_list = ["www.xxxx-{}.com".format(i) for i in range(300)]

for url in url_list:
    # 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕后再将线程交还给线程池;如果没有空闲线程,则等待。
    pool.submit(task, url,2)
    
print("END")

示例2:等待线程池的任务执行完毕。

import time
from concurrent.futures import ThreadPoolExecutor


def task(video_url):
    print("开始执行任务", video_url)
    time.sleep(5)


# 创建线程池,最多维护10个线程。
pool = ThreadPoolExecutor(10)

url_list = ["www.xxxx-{}.com".format(i) for i in range(300)]
for url in url_list:
    # 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕后再将线程交还给线程池;如果没有空闲线程,则等待。
    pool.submit(task, url)

print("执行中...")
pool.shutdown(True)  # 等待线程池中的任务执行完毕后,在继续执行
print('继续往下走')

示例3:任务执行完任务,再干点其他事。

import time
import random
from concurrent.futures import ThreadPoolExecutor, Future


def task(video_url):
    print("开始执行任务", video_url)
    time.sleep(2)
    return random.randint(0, 10)


def done(response):
    print("任务执行后的返回值", response.result())


# 创建线程池,最多维护10个线程。
pool = ThreadPoolExecutor(10)

url_list = ["www.xxxx-{}.com".format(i) for i in range(15)]

for url in url_list:
    # 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕后再将线程交还给线程池;如果没有空闲线程,则等待。
    future = pool.submit(task, url)
    future.add_done_callback(done) # 是子主线程执行
    
# 可以做分工,例如:task专门下载,done专门将下载的数据写入本地文件。

示例4:最终统一获取结果。

import time
import random
from concurrent.futures import ThreadPoolExecutor,Future


def task(video_url):
    print("开始执行任务", video_url)
    time.sleep(2)
    return random.randint(0, 10)


# 创建线程池,最多维护10个线程。
pool = ThreadPoolExecutor(10)

future_list = []

url_list = ["www.xxxx-{}.com".format(i) for i in range(15)]
for url in url_list:
    # 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕后再将线程交还给线程池;如果没有空闲线程,则等待。
    future = pool.submit(task, url)
    future_list.append(future)
    
pool.shutdown(True)
for fu in future_list:
    print(fu.result())

六.单例模式(扩展)

面向对象 + 多线程相关的一个面试题(以后项目和源码中会用到)。

之前写一个类,每次执行 类() 都会实例化一个类的对象。

class Foo:
    pass

obj1 = Foo()

obj2 = Foo()
print(obj1,obj2)

 简单的实现单例模式

class Singleton:
    instance = None

    def __init__(self, name):
        self.name = name
        
    def __new__(cls, *args, **kwargs):
        # 返回空对象
        if cls.instance:
            return cls.instance
        cls.instance = object.__new__(cls)
        return cls.instance

obj1 = Singleton('alex')
obj2 = Singleton('SB')

print(obj1,obj2)

 多线程执行单例模式,有BUG

import threading
import time


class Singleton:
    instance = None

    def __init__(self, name):
        self.name = name

    def __new__(cls, *args, **kwargs):
        if cls.instance:
            return cls.instance
        time.sleep(0.1)
        cls.instance = object.__new__(cls)
        return cls.instance


def task():
    obj = Singleton('x')
    print(obj)


for i in range(10):
    t = threading.Thread(target=task)
    t.start()

加锁解决BUG

import threading
import time
class Singleton:
    instance = None
    lock = threading.RLock()

    def __init__(self, name):
        self.name = name
        
    def __new__(cls, *args, **kwargs):
        with cls.lock:
            if cls.instance:
                return cls.instance
            time.sleep(0.1)
            cls.instance = object.__new__(cls)
        return cls.instance

def task():
    obj = Singleton('x')
    print(obj)

for i in range(10):
    t = threading.Thread(target=task)
    t.start()

加判断,提升性能

import threading
import time
class Singleton:
    instance = None
    lock = threading.RLock()

    def __init__(self, name):
        self.name = name
        
    def __new__(cls, *args, **kwargs):

        if cls.instance:
            return cls.instance
        with cls.lock:
            if cls.instance:
                return cls.instance
            time.sleep(0.1)
            cls.instance = object.__new__(cls)
        return cls.instance

def task():
    obj = Singleton('x')
    print(obj)

for i in range(10):
    t = threading.Thread(target=task)
    t.start()

# 执行1000行代码

data = Singleton('asdfasdf')
print(data)


http://www.kler.cn/a/291685.html

相关文章:

  • Gin HTML 模板渲染
  • Vue3中实现插槽使用
  • OpenCV从入门到精通实战(九)——基于dlib的疲劳监测 ear计算
  • JDK17 安装使用
  • 深挖C++赋值
  • 正则表达式语法详解(python)
  • JS中【普通函数中的this】vs【箭头函数中的this】
  • 【Python控制台小游戏】剑与魔法
  • P3631 [APIO2011] 方格染色
  • 深度学习速通系列:Bert模型vs大型语言模型(LLM)
  • 【前端面试】采用react前后,浏览器-解析渲染UI的变化
  • 解决jupyter notebook启动需要密码的问题
  • Zabbix_Proxy自动化安装脚本
  • 五分钟搭建微信机器人保姆级教程
  • SSG页面加上了 revalidate,是不是就变成了 ISG?
  • WebRTC协议下的视频汇聚融合技术:EasyCVR视频技术构建高效视频交互体验
  • python-Flask搭建简易登录界面
  • Java 7.3 - 分布式 id
  • linux——进程
  • v$session_longops监控 PDB clone 进度
  • Elasticsearch在高并发下如何保证读写一致性
  • Git创建项目
  • 一款云笔记支持在线协同文档,脑图,白板演示的工具,多个设备同步,让灵感与你同行(附源码)
  • 深度学习实战3--GAN:基础手写数字对抗生成
  • HarmonyOS开发实战( Beta5版)不要使用函数/方法作为复用组件的入参规范实践
  • 基于vue框架的车辆交易管理系统n5xwr(程序+源码+数据库+调试部署+开发环境)系统界面在最后面。