当前位置: 首页 > article >正文

处理海量数据的查重方法总结

处理海量数据的查重方法总结

在面对超出内存处理能力的大数据场景(如日志处理、大型文件去重等)时,需要设计高效的算法和系统架构。以下是针对不同场景提出的几种解决方案,包括具体实现思路及适用场景。


1. 使用哈希表查重(大数据量,存储优化)

适用场景:
  • 数据量较大,但内存仍可承载。
  • 数据可以分块处理,逐步加载到内存。
完整代码:
#include <iostream>
#include <unordered_map>
#include <vector>
#include <fstream>
#include <string>

using namespace std;

// 处理每一个数据块
void process_chunk(const vector<string>& chunk, unordered_map<string, int>& global_map) {
    for (const auto& line : chunk) {
        global_map[line]++;
    }
}

// 处理整个文件
void process_large_file(const string& filename) {
    ifstream infile(filename);
    unordered_map<string, int> global_map;  // 哈希表,用于存储数据及其出现次数
    vector<string> chunk;
    string line;
    size_t chunk_size = 100000;  // 每个块包含100,000行数据

    // 按块读取文件并处理
    while (getline(infile, line)) {
        chunk.push_back(line);
        if (chunk.size() >= chunk_size) {
            process_chunk(chunk, global_map);
            chunk.clear();  // 处理完当前块,清空缓存,准备下一块
        }
    }

    // 处理文件的最后一部分
    if (!chunk.empty()) {
        process_chunk(chunk, global_map);
    }

    // 输出重复的数据及其出现次数
    for (const auto& entry : global_map) {
        if (entry.second > 1) {
            cout << "重复的数据: " << entry.first << ", 重复次数: " << entry.second << endl;
        }
    }
}

int main() {
    string filename = "large_data.txt";  // 假设数据存储在 large_data.txt 文件中
    process_large_file(filename);
    return 0;
}
解释:
  • 使用哈希表(unordered_map)存储数据及其出现次数。
  • 数据按块读取,每个块包含100,000行,逐步处理数据。
  • 输出重复数据及其重复次数。

2. 使用布隆过滤器查重(大数据量,空间效率)

适用场景:
  • 数据量极大,内存受限,适合用布隆过滤器来查重。
  • 布隆过滤器会有一定的误判率,但可以大幅降低内存使用。
完整代码:
#include <iostream>
#include <bitset>
#include <vector>
#include <functional>
#include <fstream>

using namespace std;

class BloomFilter {
public:
    BloomFilter(size_t size) : bitset(size), hash_count(3) {}

    // 插入数据到布隆过滤器
    void insert(const string& item) {
        for (int i = 0; i < hash_count; ++i) {
            size_t hash_value = hash_fn(item, i) % bitset.size();
            bitset[hash_value] = 1;
        }
    }

    // 检查数据是否已经存在
    bool contains(const string& item) {
        for (int i = 0; i < hash_count; ++i) {
            size_t hash_value = hash_fn(item, i) % bitset.size();
            if (!bitset[hash_value]) {
                return false;  // 如果有任何哈希值对应位为0,则数据一定不存在
            }
        }
        return true;  // 否则,数据可能存在
    }

private:
    bitset<10000000> bitset;  // 使用更大的位图
    int hash_count;           // 哈希函数的数量

    // 哈希函数,使用不同的种子来生成不同的哈希值
    size_t hash_fn(const string& item, int seed) const {
        return std::hash<string>{}(item + to_string(seed));
    }
};

// 处理大文件并使用布隆过滤器查重
void process_large_file_with_bloom_filter(const string& filename) {
    ifstream infile(filename);
    BloomFilter bf(10000000);  // 创建一个布隆过滤器实例
    string line;

    // 逐行读取数据并使用布隆过滤器查重
    while (getline(infile, line)) {
        if (bf.contains(line)) {
            cout << "重复数据: " << line << endl;
        } else {
            bf.insert(line);  // 插入新的数据
        }
    }
}

int main() {
    string filename = "large_data.txt";  // 假设数据存储在 large_data.txt 文件中
    process_large_file_with_bloom_filter(filename);
    return 0;
}
解释:
  • 使用布隆过滤器对数据进行查重。
  • 布隆过滤器利用哈希函数映射数据到一个位图中,检查数据是否已存在。
  • 具有较低内存消耗,但会存在误判的概率。

3. 使用分布式查重(处理更大规模的数据)

适用场景:
  • 数据量极其庞大,甚至达到PB级别,需要多个机器分布式处理。
  • 适用于分布式环境中数据的并行处理。
完整代码:
#include <iostream>
#include <unordered_map>
#include <fstream>
#include <string>
#include <vector>

using namespace std;

// 处理每一个数据块,计算出现次数
void process_chunk(const vector<string>& chunk, unordered_map<string, int>& local_map) {
    for (const auto& line : chunk) {
        local_map[line]++;
    }
}

// 合并多个机器的结果
void process_large_data_in_parallel(const string& filename, const vector<string>& chunk_files) {
    unordered_map<string, int> global_map;  // 全局哈希表
    size_t chunk_size = 100000;  // 每个块包含100,000行数据

    // 模拟并行处理文件块
    for (const string& chunk_file : chunk_files) {
        ifstream infile(chunk_file);
        vector<string> chunk;
        string line;

        // 读取并处理当前块的数据
        while (getline(infile, line)) {
            chunk.push_back(line);
            if (chunk.size() >= chunk_size) {
                process_chunk(chunk, global_map);  // 更新全局哈希表
                chunk.clear();
            }
        }

        // 处理文件块的最后一部分
        if (!chunk.empty()) {
            process_chunk(chunk, global_map);
        }
    }

    // 输出重复数据
    for (const auto& entry : global_map) {
        if (entry.second > 1) {
            cout << "重复数据: " << entry.first << ", 重复次数: " << entry.second << endl;
        }
    }
}

int main() {
    vector<string> chunk_files = {"chunk1.txt", "chunk2.txt", "chunk3.txt"};  // 假设数据已被分割成多个文件
    process_large_data_in_parallel("large_data.txt", chunk_files);
    return 0;
}
解释:
  • 数据被分割成多个块,每个块在不同的机器或线程上处理。
  • 各个机器处理完的数据将汇总到一个全局哈希表中,最后输出重复数据。

4. 使用 MapReduce 框架进行分布式查重

适用场景:
  • 适用于超大数据集,通常是分布式计算场景。
  • 可以在多个节点或机器上并行处理。
完整代码:
#include <iostream>
#include <unordered_map>
#include <fstream>
#include <vector>
#include <string>

using namespace std;

// Map函数:处理数据块
void map_function(const vector<string>& chunk, vector<unordered_map<string, int>>& intermediate_results) {
    unordered_map<string, int> local_map;
    for (const auto& line : chunk) {
        local_map[line]++;
    }
    intermediate_results.push_back(local_map);  // 将结果存入中间结果集
}

// Reduce函数:合并所有Map结果
void reduce_function(const vector<unordered_map<string, int>>& mapped_data) {
    unordered_map<string, int> global_map;

    // 合并所有节点的计数
    for (const auto& data : mapped_data) {
        for (const auto& entry : data) {
            global_map[entry.first] += entry.second;
        }
    }

    // 输出重复数据
    for (const auto& entry : global_map) {
        if (entry.second > 1) {
            cout << "重复数据: " << entry.first << ", 重复次数: " << entry.second << endl;
        }
    }
}

// 模拟MapReduce流程
void process_large_data_with_mapreduce(const string& filename) {
    ifstream infile(filename);
    vector<string> chunk;
    string line;
    size_t chunk_size = 100000;  // 每个块包含100,000行数据
    vector<unordered_map<string, int>> intermediate_results;

    while (getline(infile, line)) {
        chunk.push_back(line);
        if (chunk.size() >= chunk_size) {
            map_function(chunk, intermediate_results);  // 执行Map函数
            chunk.clear

();
        }
    }

    if (!chunk.empty()) {
        map_function(chunk, intermediate_results);  // 处理最后一部分数据
    }

    reduce_function(intermediate_results);  // 执行Reduce函数
}

int main() {
    string filename = "large_data.txt";
    process_large_data_with_mapreduce(filename);
    return 0;
}
解释:
  • Map 函数处理数据块,生成中间结果。
  • Reduce 函数汇总所有中间结果,合并并输出重复数据。

总结

方法优点缺点适用场景
哈希表查重简单易实现,性能好内存占用大中等规模数据
布隆过滤器查重节省空间,适合极大数据量存在误判,不支持计数极大数据量,空间受限场景
分布式查重支持超大规模数据,扩展性好开发成本和系统管理要求高分布式环境,大规模数据
MapReduce高度并行化,适合超大数据集需要大数据框架支持,实现复杂PB级别数据,分布式计算场景

根据实际需求和资源条件选择合适的方案。


http://www.kler.cn/a/427983.html

相关文章:

  • C++ 堆栈分配的区别
  • 【新春特辑】2025年1月科技浪潮中的AI最新时事与科技趋势
  • 百度热力图数据获取,原理,处理及论文应用5
  • 解锁豆瓣高清海报:深度爬虫与requests进阶之路
  • 网络爬虫学习:应用selenium获取Edge浏览器版本号,自动下载对应版本msedgedriver,确保Edge浏览器顺利打开。
  • 复古壁纸中棕色系和米色系哪个更受欢迎?
  • 【WRF运行第一期(Ubuntu)】模型运行前准备
  • 高数极限与连续练习题(自用)
  • 网络渗透实验二(渗透课)
  • 新160个crackme - 109-Jony-crackme
  • ElementUI:el-tabs 切换之前判断是否满足条件
  • docker-3.docker权限问题
  • 开发一个AMT(automatic multicast tunnel)协议库 C++版本,Client,Server详细的设计
  • STM32F103单片机使用STM32CubeMX创建IAR串口工程
  • mac 安装python3和配置环境变量
  • 【Leetcode Top 100】146. LRU 缓存
  • Octo—— 基于80万个机器人轨迹的预训练数据集用于训练通用机器人,可在零次拍摄中解决各种任务
  • 网络资源模板--Android Studio 实现绿豆通讯录
  • 【springboot】 多数据源实现
  • 塑胶模具基本结构及塑胶成型原理
  • ubuntu 使用USB转TTL线连接树莓派4B
  • 【Android】ARouter源码解析
  • Python 信息科技赛课区一等奖教案(语音合成技术)
  • 【Elasticsearch】初始化默认字段及分词
  • Python中的数据可视化实战
  • spring-boot-starter-validation校验启动器简述