C# 关于实现保存数据以及数据溯源推送
前言
实现了一个数据接收、存储和推送的功能
首先定义我们数据存储的格式(可根据自己的需求定义格式):
数据切割符号:**$
是区分数据
与其他数据
的划分
数据内容切割号:|
**是区分时间戳
与内容数据
的划分
以下是我存储的文本格式Data.log
或者Data.txt
$2024-12-07 16:26:53.799|数据1
$2024-12-07 16:26:54.920|数据2
$2024-12-07 16:26:55.640|数据3
...
...
采集与推送:
以下是具体的代码内容
using DataAcquisitionModule.Helper;
using HslCommunication.MQTT;
using log4net;
using log4net.Core;
using Sunny.UI;
using Sunny.UI.Win32;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace DataAcquisitionModule.Server
{
public class DataReceiver
{
private static readonly ILog logger = LogManager.GetLogger(typeof(DataReceiver));
private static readonly object lockObject = new object();
private readonly string _filePath;
private readonly ConcurrentQueue<string> _dataQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
private bool _isRunning;
public DataReceiver(string filePath)
{
_filePath = filePath;
_dataQueue = new ConcurrentQueue<string>();
_cancellationTokenSource = new CancellationTokenSource();
_message = new MessageLog();
// 启动数据存储任务
_ = StartSavingDataAsync(_cancellationTokenSource.Token);
}
//数据入列,多肽1
public void HandleMsg(byte[] msg)
{
var data = Encoding.UTF8.GetString(msg);
_dataQueue.Enqueue(data);
}
//数据入列,多肽2
public void HandleMsg(string topic, string msg)
{
string strMsg = $"{topic}#{msg}";
_dataQueue.Enqueue(strMsg);
}
private async Task StartSavingDataAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
if (_dataQueue.TryDequeue(out var data))
{
await SaveDataToFileAsync(data);
}
else
{
// 如果队列为空,短暂等待
await Task.Delay(100, cancellationToken);
}
}
}
private async Task SaveDataToFileAsync(string data)
{
try
{
logger.Info($"保存数据: {data}");
await File.AppendAllTextAsync(_filePath, $"${DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}|{data}\n");
}
catch (Exception ex)
{
logger.Error("保存数据到文件时发生错误", ex);
}
}
public async Task LoadAndProcessDataAsync(Action<string> callAction)
{
_isRunning = true;
while (!_cancellationTokenSource.Token.IsCancellationRequested && _isRunning)
{
List<string> lines = new List<string>();
lock (lockObject)
{
if (File.Exists(_filePath))
{
// 一次性读取整个文件内容
string fileContent = await File.ReadAllTextAsync(_filePath);
lines.AddRange(fileContent.Split('$', StringSplitOptions.RemoveEmptyEntries).Select(part => part.Trim()));
//RemoveEmptyEntries:返回数组元素移除空字符串元素(不包含空字符串元素);
}
}
if (lines.Count > 0)
{
DateTime startTime = DateTime.Parse(lines.First().Split('|')[0]);
DateTime timestamp = new DateTime();
TimeSpan delay = new TimeSpan(0);
string[] strData;
foreach (var line in lines)
{
if (!_isRunning)
{
break;
}
strData = line.Split('|');
if (strData.Length < 2 || string.IsNullOrEmpty(strData[1]))
{
continue;
}
timestamp = DateTime.Parse(strData[0]);
delay = timestamp - startTime;
if (delay.TotalMilliseconds > 0)
{
await Task.Delay(delay);
}
startTime = timestamp;
callAction?.Invoke(strData[1]);
logger.Info($"数据推送: {line}");
}
callAction?.Invoke("推送完毕!!!");
break;
}
await Task.Delay(10000); // 每10秒检查一次
}
}
public void StopPushData()
{
_isRunning = false;
}
public void StopReceiving()
{
_cancellationTokenSource.Cancel();
}
}
}
注意:
- 优化文件读取和处理
• 减少文件读取次数:避免频繁读取文件,特别是在文件较大时,可以考虑使用内存缓存。
• 优化文件写入:使用File.AppendAllText
方法代替 StreamWriter,减少文件打开和关闭的开销。 - 异步处理
• 异步文件操作:使用File.ReadAllLinesAsync
和 File.WriteAllLinesAsync 方法进行异步文件操作,减少阻塞。 - 错误处理
• 增加异常处理:在文件操作和网络通信中增加异常处理,确保程序的稳定性。 - 日志记录
• 日志记录:使用 log4net 或其他日志框架记录关键操作的日志,方便调试和维护。 - 代码结构优化
• 分离关注点:将数据接收、存储和推送逻辑分离到不同的类或方法中,提高代码的可读性和可维护性。
示例:数据采集和调用
假设我们有一个简单的控制台应用程序,用于启动 DataReceiver 并模拟数据的接收和处理。
1. 创建 DataReceiver 实例
首先,我们需要创建一个 DataReceiver 实例,并指定数据存储的文件路径。
2. 模拟数据接收
我们可以模拟从外部源(如 MQTT 消息队列)接收到的数据,并调用 HandleMsg 方法将其添加到队列中。
3. 处理和推送数据
启动一个任务来处理和推送数据,调用 LoadAndProcessDataAsync 方法。
示例代码
using DataAcquisitionModule.Helper;
using HslCommunication.MQTT;
using log4net;
using log4net.Config;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace DataAcquisitionModule
{
class Program
{
private static readonly ILog logger = LogManager.GetLogger(typeof(Program));
static async Task Main(string[] args)
{
// 配置 log4net
XmlConfigurator.Configure(new FileInfo("log4net.config"));
// 指定数据存储文件路径
string filePath = "data.log";
// 创建 DataReceiver 实例
DataReceiver dataReceiver = new DataReceiver(filePath);
// 模拟数据接收
SimulateDataReception(dataReceiver);
// 启动数据处理和推送任务
await dataReceiver.LoadAndProcessDataAsync(data =>
string[] msg = data.Split('#');
if (msg.Length == 2)
{
mqttServer.PublishAllClientTopicPayload(msg[0], Encoding.UTF8.GetBytes(msg[1]));
Console.WriteLine($"推送的topic: {msg[0]}");
Console.WriteLine($"处理具体的数据: {Encoding.UTF8.GetBytes(msg[1])}");
}
);
// 模拟停止数据接收和推送
Console.WriteLine("按任意键停止数据接收和推送...");
Console.ReadKey();
dataReceiver.StopPushData();
dataReceiver.StopReceiving();
}
private static void SimulateDataReception(DataReceiver dataReceiver)
{
// 模拟从外部源接收到的数据
Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
string topic = "Topic" + i;
string message = "Message" + i;
dataReceiver.HandleMsg(topic, message);
await Task.Delay(1000); // 模拟每秒接收一条数据
}
});
}
}
}
解释
- 配置 log4net:
• 使用 XmlConfigurator.Configure 方法加载 log4net 配置文件,确保日志记录正常工作。 - 创建 DataReceiver 实例:
• 指定数据存储文件路径 data.log,并创建 DataReceiver 实例。 - 模拟数据接收:
• 使用SimulateDataReception
方法模拟从外部源接收到的数据。这里使用一个 Task 来模拟每秒接收一条数据,并调用 HandleMsg 方法将数据添加到队列中。 - 启动数据处理和推送任务:
• 调用LoadAndProcessDataAsync
方法启动数据处理和推送任务。这里使用 Console.WriteLine 方法来模拟数据处理操作。 - 停止数据接收和推送:
• 按任意键停止数据接收和推送任务,调用StopPushData
和StopReceiving
方法。
运行结果
运行上述代码后,程序会模拟接收数据并将其存储到 data.log 文件中。然后,程序会读取文件中的数据并按时间顺序推送,每条数据的推送时间间隔与实际接收时间间隔一致。
通过这种方式,验证 DataReceiver 类的功能,其实大家可以根据实际需求进行调整和扩展。我这边只是简单演示