当前位置: 首页 > article >正文

鸿蒙多线程应用-taskPool

 并发模型

      并发模型是用来实现不同应用场景中并发任务的编程模型,常见的并发模型分为基于内存共享的并发模型和基于消息通信的并发模型。

        Actor并发模型作为基于消息通信并发模型的典型代表,不需要开发者去面对锁带来的一系列复杂偶发的问题,同时并发度也相对较高,因此得到了广泛的支持和使用。

       当前鸿蒙ArkTS提供了TaskPool和Worker两种并发能力,TaskPool和Worker都基于Actor并发模型实现。

       内存共享并发模型指多线程同时执行任务,这些线程依赖同一内存并且都有权限访问,线程访问内存前需要抢占并锁定内存的使用权,没有抢占到内存的线程需要等待其他线程释放使用权再执行。

       Actor并发模型每一个线程都是一个独立Actor,每个Actor有自己独立的内存,Actor之间通过消息传递机制触发对方Actor的行为,不同Actor之间不能直接访问对方的内存空间。Actor并发模型对比内存共享并发模型的优势在于不同线程间内存隔离,不会产生不同线程竞争同一内存资源的问题。开发者不需要考虑对内存上锁导致的一系列功能、性能问题,提升了开发效率。

       由于Actor并发模型线程之间不共享内存,需要通过线程间通信机制传输并发任务和任务结果。

TaskPool简介

      任务池(TaskPool)作用是为应用程序提供一个多线程的运行环境,降低整体资源的消耗、提高系统的整体性能,且您无需关心线程实例的生命周期。

     TaskPool支持开发者在宿主线程封装任务抛给任务队列,系统选择合适的工作线程,进行任务的分发及执行,再将结果返回给宿主线程。接口直观易用,支持任务的执行、取消,以及指定优先级的能力,同时通过系统统一线程管理,结合动态调度及负载均衡算法,可以节约系统资源。系统默认会启动一个任务工作线程,当任务较多时会扩容,工作线程数量上限跟当前设备的物理核数相关,具体数量内部管理,保证最优的调度及执行效率,长时间没有任务分发时会缩容,减少工作线程数量。

TaskPool注意事项

  • 实现任务的函数需要使用@Concurrent装饰器标注,且仅支持在.ets文件中使用。

  • 从API version 11开始,跨并发实例传递带方法的实例对象时,该类必须使用装饰器@Sendable装饰器标注,且仅支持在.ets文件中使用。

  • 任务函数在TaskPool工作线程的执行耗时不能超过3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时),否则会被强制退出。

  • 实现任务的函数入参需满足序列化支持的类型,详情请参见线程间通信对象。

  • ArrayBuffer参数在TaskPool中默认转移,需要设置转移列表的话可通过接口setTransferList()设置。

  • 由于不同线程中上下文对象是不同的,因此TaskPool工作线程只能使用线程安全的库,例如UI相关的非线程安全库不能使用。

  • 序列化传输的数据量大小限制为16MB。

  • Priority的IDLE优先级是用来标记需要在后台运行的耗时任务(例如数据同步、备份),它的优先级别是最低的。这种优先级标记的任务只会在所有线程都空闲的情况下触发执行,并且只会占用一个线程来执行。

  • Promise不支持跨线程传递,如果TaskPool返回pending或rejected状态的Promise,会返回失败;对于fulfilled状态的Promise,TaskPool会解析返回的结果,如果结果可以跨线程传递,则返回成功。

  • 不支持在TaskPool工作线程中使用AppStorage。

TaskPool应用实例

       生产者消费者模型应用taskPool的具体代码实现

1.生产者


import { taskpool } from '@kit.ArkTS';
import { stingToUint8, uint8TransformString } from './utils';

@Concurrent
export async function producer(ArrayBuffer: Int32Array, dataBuffer: Uint8Array, newStr: string) {
  let i32a = ArrayBuffer;
  let array = dataBuffer
  if (array[array.length-1] !== 0) {
    taskpool.Task.sendData(false)
    let runner = new taskpool.SequenceRunner()
    console.log("-----atomics-producer-push-fal-" + newStr)
    return
  }
  let jsonStr: string = uint8TransformString(array)

  let arr: string[] = []
  try {
    arr= JSON.parse(jsonStr) as string[]
  } catch (e) {
    taskpool.Task.sendData(false)
    return
  }

  arr.push(newStr)
  let newArrJson = JSON.stringify(arr) ?? ''
  //console.log("newArrJson" + newArrJson)
  let isFinish = stingToUint8(newArrJson,array,4)
  if (!isFinish) {
    arr.pop()
    let newArrJson1 = JSON.stringify(arr) ?? ''
    stingToUint8(newArrJson1,array,4)
    taskpool.Task.sendData(false)
    console.log("-----atomics-producer-push-fal-" + newStr)
  }else{
    console.log("-----atomics-producer-push-sec-" + newStr)
  }

  Atomics.notify(i32a, 0, 1)
  Promise.resolve()
}

2.消费者


import { getStringArrayFromJson, testMethod, uint8TransformString} from './utils';
import { buffer, taskpool } from '@kit.ArkTS';
import { ThreadUtils } from './ThreadUtils';

@Concurrent
export async function consumerTask(ArrayBuffer: Int32Array, dataBuffer: Uint8Array): Promise<void> {
  let i32a = ArrayBuffer;
  let array = dataBuffer

  while (true) {
    let jsonStr: string = uint8TransformString(array)
    let arr = getStringArrayFromJson(jsonStr)

    if (arr.length == 0) {
      Atomics.wait(i32a, 0, 0);
    } else {
      let i = 4
      for (let index = 0; index < array.byteLength; index++) {
        if (i >= array.byteLength) {
          break
        }
        Atomics.store(array, i++, 0)
      }
      taskpool.Task.sendData(true)
      let writeResult: boolean = true
      while ((writeResult == true || writeResult == false)) {
        let ele = arr.shift()
        if (!ele) {
          break
        }
        writeResult = await ThreadUtils.getInstance().writeToFile(ele)
        console.log('-----atomics-consumer-' + ele)
      }

    }
  }
}


3.字符串和字节码相互转换工具

export function testMethod(str: string) {
  console.log('--test-function-str-' + str)
}
export function uint8TransformString(array:Uint8Array): string{
  let jsonStr: string = JSON.stringify([])
  let tempArr: number[] = []
  let j = 0
  for (let index = 0; index < array.length; index++) {
    if (array[index] == 0) {
      continue
    }
    tempArr[j++] = array[index]
  }

  let temp = new Uint8Array(tempArr)
  if (temp.byteLength > 0) {
    let str = '';
    for (let i = 0; i < temp.length; ) {
      let byte1 = temp[i];
      let codePoint: number

      if (byte1 >> 7 === 0) { // 1字节
        codePoint = byte1;
        i += 1;
      } else if (byte1 >> 5 === 0b110) { // 2字节
        codePoint = ((byte1 & 0b11111) << 6) | (temp[i + 1] & 0b111111);
        i += 2;
      } else if (byte1 >> 4 === 0b1110) { // 3字节
        codePoint = ((byte1 & 0b1111) << 12) | ((temp[i + 1] & 0b111111) << 6) | (temp[i + 2] & 0b111111);
        i += 3;
      } else {
        // 错误处理:不支持的字节序列
        i += 1; // 跳过当前字节
        continue;
      }
      str += String.fromCodePoint(codePoint)
      console.info('字节流转成可理解的字符串:' + str);

    }
    jsonStr = str
  }


  return jsonStr
}
//
export function stingToUint8(json: string, array:Uint8Array,formIndex: number = 0) : boolean{
  let i = formIndex
  let isFinish = true
  for (let index = 0; index < json.length; index++) {
    if (i >= array.byteLength) {
      if (index < json.length - 1) {
        isFinish = false
      }
      break
    }
    const element = json.charCodeAt(index);
    if (element > 0x7FF) {
      Atomics.store(array, i++, (0xE0 | (element >> 12)))
      Atomics.store(array, i++, (0x80 | ((element >> 6) & 0x3F)))
      Atomics.store(array, i++, (0x80 | (element & 0x3F)))
    } else if (element > 0x7F) {
      Atomics.store(array, i++, (0xC0 | (element >> 6)))
      Atomics.store(array, i++, (0x80 | (element & 0x3F)))
    } else {
      Atomics.store(array, i++, (element))
    }
  }
  //剩余空间赋值0
  for (let index = i; index < array.length; index++) {
    array[index] = 0
  }
  return isFinish
}

4.单例工具

import { taskpool } from '@kit.ArkTS';
import { it } from '@ohos/hypium';
import { consumerTask } from './consumer';
import { producer } from './product';

export class ThreadUtils {

  private tempLogList: Array<string> = new Array()

  private static instance: ThreadUtils
  private sab :SharedArrayBuffer
  private ui8 :Uint8Array
  private i32a :Int32Array

  private constructor(bufferSize:number = 1024) {
    this.sab = new SharedArrayBuffer(bufferSize)
    this.ui8 = new Uint8Array(this.sab)
    this.i32a = new Int32Array(this.sab)
    this.startConsumer()
  };

  writeLog(log: string) {
    if (this.flag) {
      this.tempLogList.push(log)
    }else {
      this.product(log)
    }

  }

  public static getInstance(bufferSize:number = 1024): ThreadUtils {
    if (!ThreadUtils.instance) {
      ThreadUtils.instance = new ThreadUtils(bufferSize);
    }
    return ThreadUtils.instance;
  }


  async writeToFile(content: string): Promise<boolean> {
    return new Promise((resolve, reject) => {
      setTimeout(() => {

        console.log("日志写入完成=" + content)
        console.log('pop element=' + content)
        resolve(true)
      }, 4000)
    })
  }

  lastTask:taskpool.Task | undefined
  flag = false
  async product(log: string):Promise<boolean> {
    return new Promise<boolean>((resolve,reject)=>{
      let newLog = log
      let task = new taskpool.Task(producer, this.i32a, this.ui8, newLog)
      if (this.lastTask) {
        task.addDependency(this.lastTask)
      }

      this.lastTask = task
      task.onReceiveData((success: boolean) => {
        if (!success) {
          this.flag = true
          this.tempLogList.unshift(log)
          resolve(false)
        }
      })
      taskpool.execute(task).then(()=>{
        console.log('------taskpool.execute.then-----')
        resolve(true)
      });
    })


  }

  isWhile = false
  async startConsumer() {
    let task = new taskpool.Task(consumerTask, this.i32a, this.ui8)
    task.onReceiveData(async (hasSpace: boolean) => {
      if (hasSpace) {
        this.flag = false
        if (this.tempLogList.length > 0 && this.isWhile == false){
          let item = this.tempLogList.shift()
          console.log('---item---'+ item)
          this.isWhile = true
          let com = true
          while (item && this.flag == false && com){
            com = await this.product(item)
            item = this.tempLogList.shift()
          }
          this.isWhile = false
        }

      }
    })
    taskpool.execute(task)
  }

}

5.页面UI应用

import { buffer, taskpool } from '@kit.ArkTS';
import { consumerTask } from '../consumer';
import { producer } from '../product';
import { router } from '@kit.ArkUI';
import { ThreadUtils } from '../ThreadUtils';


@Entry
@Component
struct Index {
  timer = -1
  count = 0
  logTool = ThreadUtils.getInstance(32)
  aboutToAppear(): void {

  }



  @State inputText:string =''
  build() {
    Column({space: 20}) {

      TextInput({text: $$this.inputText})
        .width('80%')
      Button() {
        Text("生产日志").padding(10)
      }
      .backgroundColor(Color.Gray)
      .onClick(async () => {

        this.timer = setInterval(()=>{
          this.logTool.writeLog ('item' + this.count)
          this.count += 1
        },1000)

      })
      Button() {
        Text("停止生产").padding(10)
      }
      .backgroundColor(Color.Gray)
      .onClick(async () => {
        clearInterval(this.timer)
        // router.pushUrl({
        //   url: 'pages/TaskPoolPage'
        // })

      })

    }
    .alignItems(HorizontalAlign.Center)
    .justifyContent(FlexAlign.Center)
    .height('100%')
    .width('100%')
  }
}


http://www.kler.cn/a/414985.html

相关文章:

  • 52-基于单片机的超声波、温湿度、光照检测分阶段报警
  • 【前端】跨域问题与缓存
  • 微软要求 Windows Insider 用户试用备受争议的召回功能
  • ClamAV 在 CentOS 的开发版本 `clamav-devel`
  • 关于Vscode配置Unity环境时的一些报错问题(持续更新)
  • java-分而治之算法
  • spark3.x之后时间格式数据偶发报错org.apache.spark.SparkUpgradeException
  • Linux中网络文件系统nfs使用
  • S4 UPA of AA :新资产会计概览
  • 如何使用PHP爬虫获取店铺详情:一篇详尽指南
  • 初识 Django
  • 2024第六次随堂测验参考答案
  • leetcode 208. 实现 Trie (前缀树)
  • pico-sdk(八)-程序架构之自定义预处理变量
  • 【opencv-python】的cv2.imdecode()与cv2.imencode()
  • 力扣--LCR 148.验证图书取出顺序
  • 二维码有哪些网络安全风险隐患?
  • 【C语言篇】探索 C 语言结构体:从基础语法到数据组织的初体验
  • 力扣,88. 合并两个有序数组
  • [2024年3月10日]第15届蓝桥杯青少组stema选拔赛C++中高级(第二子卷、编程题(1))
  • 项目整合logback日志打印线程id
  • GraphRAG访问模式和知识图谱建模
  • HarmonyOS-初级(一)
  • 【ANC系统】主动噪声控制系统结构分类
  • 前端——自定义组件
  • ubuntu防火墙入门(一)——设置服务、关闭端口