当前位置: 首页 > article >正文

AWS EMR使用Apache Kylin快速分析大数据

在AWS Elastic MapReduce(EMR)集群上部署和使用Apache Kylin,以实现对大规模数据集的快速分析,企业可以充分利用云计算的强大资源和Kylin的数据分析能力,实现快速、高效的数据分析。以下是该案例的详细步骤和要点:

背景

Apache Kylin是一个开源的分布式分析引擎,设计用于处理超大规模数据集,提供亚秒级的查询响应时间。AWS(Amazon Web Services)是亚马逊公司的云计算平台,提供包括弹性计算、存储、数据库在内的一整套云计算服务。结合AWS的强大计算能力和Kylin的数据分析能力,企业可以加速数据分析过程,提升数据挖掘能力。

实施过程

  1. 准备AWS服务资源

    • 创建一个AWS账号,并配置必要的权限。

    • 了解与Amazon EMR集群相关的AWS服务资源,如VPC(Virtual Private Cloud)、EC2(Elastic Compute Cloud)和S3(Simple Storage Service)。

  2. 创建Amazon EMR集群

    • 在AWS控制台中选择EMR服务,点击“创建集群”。

    • 配置集群参数,包括选择EMR版本(如emr-5.21.0或更高版本,以确保支持Apache Kylin)、实例类型、数量以及网络设置等。

    • 勾选Apache Kylin运行必需的服务组件,如Hadoop、HBase、Hive等。

  3. 在EMR集群上安装Kylin

    • 登录到EMR集群的主节点。

    • 下载并解压Apache Kylin安装包。

    • 配置Kylin的环境变量和kylin.properties文件。

    • 替换必要的Jar包,以确保Kylin与EMR集群中的其他服务组件兼容。

  4. 配置Kylin数据源和Cube

    • 将数据存储在AWS的S3或HDFS中,并使用Hive进行预处理和清洗。

    • 在Kylin中定义数据源,指向存储在S3或HDFS中的数据。

    • 创建Cube,定义维度和度量,以及分区策略。

  5. 构建和查询Cube

    • 配置Cube构建任务,定期从数据源中提取数据并加载到Kylin中进行预计算。

    • 使用Kylin的Web界面或REST API进行查询,享受亚秒级的查询响应时间。

结果

通过在AWS的EMR集群上部署Apache Kylin,企业可以实现以下效益:

• 加速数据分析:Kylin的预计算机制显著减少了实时查询的计算量,提高了查询速度。

• 降低成本:利用AWS的按需付费和弹性扩展特性,企业可以根据实际需求灵活调整资源使用,降低IT投入成本。

• 提高系统稳定性:Kylin的分布式架构和高可用性设计确保了系统在高并发查询下的稳定运行。

示例代码

以下是一个在AWS EMR上创建Kylin Cube的示例代码:

 CREATE CUBE my_cube
DIMENSIONS (
    dimension1,
    dimension2
)
MEASURES (
    SUM(measure1),
    COUNT(measure2)
)
PARTITIONED BY (partition_date);

此代码创建了一个名为my_cube的Cube,包含了两个维度dimension1和dimension2,以及两个度量SUM(measure1)和COUNT(measure2)。数据按partition_date进行分区。

以下是在AWS EMR上部署Apache Kylin并实现数据分析的具体流程与关键Python代码实现:


一、AWS EMR集群创建(Python自动化)

使用boto3库自动化创建EMR集群:

import boto3

def create_emr_cluster():
    emr = boto3.client('emr', region_name='us-west-2')
    response = emr.run_job_flow(
        Name='Kylin-EMR-Cluster',
        ReleaseLabel='emr-6.8.0',  # 确保支持Kylin
        Applications=[
            {'Name': 'Hadoop'},
            {'Name': 'Hive'},
            {'Name': 'HBase'}
        ],
        Instances={
            'InstanceGroups': [
                {
                    'Name': 'MasterNode',
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'MASTER',
                    'InstanceType': 'm5.xlarge',
                    'InstanceCount': 1,
                },
                {
                    'Name': 'CoreNodes',
                    'Market': 'SPOT',  # 使用Spot实例降低成本
                    'InstanceRole': 'CORE',
                    'InstanceType': 'm5.xlarge',
                    'InstanceCount': 2,
                }
            ],
            'Ec2KeyName': 'your-key-pair',
            'KeepJobFlowAliveWhenNoSteps': True,
            'Ec2SubnetId': 'subnet-xxxxxx'
        },
        BootstrapActions=[
            {
                'Name': 'Install-Kylin',
                'ScriptBootstrapAction': {
                    'Path': 's3://your-bucket/install-kylin.sh'  # 引导脚本自动安装Kylin
                }
            }
        ],
        ServiceRole='EMR_DefaultRole',
        JobFlowRole='EMR_EC2_DefaultRole'
    )
    return response['JobFlowId']

# 执行创建
cluster_id = create_emr_cluster()
print(f"Cluster created with ID: {cluster_id}")

二、Kylin安装引导脚本(install-kylin.sh)

#!/bin/bash
# 下载并解压Kylin
wget https://archive.apache.org/dist/kylin/apache-kylin-3.1.2/apache-kylin-3.1.2-bin-hbase1x.tar.gz
tar -xzf apache-kylin-3.1.2-bin-hbase1x.tar.gz -C /opt/
mv /opt/apache-kylin-3.1.2-bin-hbase1x /opt/kylin

# 配置环境变量
echo 'export KYLIN_HOME=/opt/kylin' >> /etc/profile
echo 'export PATH=$KYLIN_HOME/bin:$PATH' >> /etc/profile
source /etc/profile

# 替换HBase兼容性JAR(根据EMR版本调整)
cp /usr/lib/hbase/lib/*.jar /opt/kylin/ext/

# 启动Kylin服务
kylin.sh start

三、Hive表创建(指向S3数据)

使用pyhive连接Hive并定义外部表:

from pyhive import hive

conn = hive.Connection(host='emr-master-node-ip', port=10000)
cursor = conn.cursor()

# 创建外部表指向S3数据
cursor.execute('''
CREATE EXTERNAL TABLE IF NOT EXISTS sales_data (
    transaction_id STRING,
    product_id STRING,
    sale_amount DOUBLE,
    transaction_date DATE
)
STORED AS PARQUET
LOCATION 's3://your-bucket/sales-data/'
''')
print("Hive table created successfully.")

四、Kylin Cube创建(REST API调用)

使用requests调用Kylin API创建Cube:

import requests
import json

kylin_url = 'http://<emr-master-ip>:7070/kylin/api'
headers = {'Content-Type': 'application/json', 'Authorization': 'Basic YWRtaW46S1lMSU4='}  # 默认admin/KYLIN

# 1. 创建项目
project_payload = {"name": "Sales_Project"}
requests.post(f'{kylin_url}/projects', headers=headers, data=json.dumps(project_payload))

# 2. 创建数据模型
model_payload = {
    "name": "sales_model",
    "project": "Sales_Project",
    "fact_table": "SALES_DATA",
    "lookups": [],
    "dimensions": [
        {"table": "SALES_DATA", "column": "PRODUCT_ID"},
        {"table": "SALES_DATA", "column": "TRANSACTION_DATE"}
    ],
    "metrics": ["SUM(SALE_AMOUNT)", "COUNT(TRANSACTION_ID)"],
    "partition_desc": {"partition_date_column": "TRANSACTION_DATE"}
}
requests.post(f'{kylin_url}/models', headers=headers, data=json.dumps(model_payload))

# 3. 创建Cube
cube_payload = {
    "name": "sales_cube",
    "model_name": "sales_model",
    "dimensions": [
        {"name": "PRODUCT_ID", "table": "SALES_DATA", "column": "PRODUCT_ID"},
        {"name": "TRANSACTION_DATE", "table": "SALES_DATA", "column": "TRANSACTION_DATE"}
    ],
    "measures": [
        {"name": "TOTAL_SALES", "function": {"expression": "SUM(SALE_AMOUNT)"}},
        {"name": "TRANSACTION_COUNT", "function": {"expression": "COUNT(TRANSACTION_ID)"}}
    ],
    "partition_date_start": "2023-01-01",
    "auto_merge_time_ranges": [7, 30]
}
response = requests.post(f'{kylin_url}/cubes', headers=headers, data=json.dumps(cube_payload))
print("Cube创建状态:", response.status_code)

五、触发Cube构建与查询

# 触发Cube构建
build_payload = {
    "startTime": "2023-01-01",
    "endTime": "2023-12-31",
    "buildType": "BUILD"
}
requests.put(f'{kylin_url}/cubes/sales_cube/build', headers=headers, data=json.dumps(build_payload))

# 执行SQL查询
query = """
SELECT PRODUCT_ID, SUM(SALE_AMOUNT) 
FROM SALES_DATA 
WHERE TRANSACTION_DATE BETWEEN '2023-01-01' AND '2023-12-31'
GROUP BY PRODUCT_ID
"""
result = requests.post(f'{kylin_url}/query', headers=headers, data=json.dumps({"sql": query}))
print("查询结果:", result.json())

关键要点说明

  1. 自动化部署:通过boto3和引导脚本实现EMR集群与Kylin的一键部署。
  2. 数据准备:Hive表直接映射S3数据,避免数据迁移。
  3. Cube优化:按日期分区和自动合并策略提升查询性能。
  4. 成本控制:使用Spot实例和EMR自动伸缩降低资源成本。
  5. 安全实践:在AWS中配置VPC和安全组限制访问来源IP。

实际部署时需替换代码中的占位符(如S3路径、EMR主节点IP),并根据数据规模调整EMR集群配置。


http://www.kler.cn/a/529950.html

相关文章:

  • 【Qt】Qt老版本解决中文乱码
  • 智能家居监控系统数据收集积压优化
  • ArkTS语言介绍
  • [原创](Modern C++)现代C++的关键性概念: 流格式化
  • 写好简历的三个关键认知
  • git基础使用--1--版本控制的基本概念
  • 第三篇:模型压缩与量化技术——DeepSeek如何在边缘侧突破“小而强”的算力困局
  • 《Origin画百图》之脊线图
  • 精品PPT | 企业大数据治理平台统一指标库建设方案
  • IM 即时通讯系统-51-MPush开源实时消息推送系统
  • 手写单层RNN网络,后续更新
  • K8S集群架构及主机准备
  • SQL索引优化_提高系统响应速度的秘诀
  • Deepseek R1 本地化部署指南:跨平台实战
  • react redux监测值的变化
  • 硕成C语言1笔记
  • Linux - 进程间通信(3)
  • IOC三种实现方式的区别
  • Brooks MX Marathon Expressrm User Manual MX集成系统平台
  • 建表注意事项(2):表约束,主键自增,序列[oracle]
  • Lesson 127 A famous actress
  • 一维数组0-1背包问题理论基础
  • w189电商平台的设计与实现
  • 尝试ai生成figma设计
  • 系统URL整合系列视频二(界面原型)
  • 独立开发经验谈:如何借助 AI 辅助产品 UI 设计