AWS EMR使用Apache Kylin快速分析大数据
在AWS Elastic MapReduce(EMR)集群上部署和使用Apache Kylin,以实现对大规模数据集的快速分析,企业可以充分利用云计算的强大资源和Kylin的数据分析能力,实现快速、高效的数据分析。以下是该案例的详细步骤和要点:
背景
Apache Kylin是一个开源的分布式分析引擎,设计用于处理超大规模数据集,提供亚秒级的查询响应时间。AWS(Amazon Web Services)是亚马逊公司的云计算平台,提供包括弹性计算、存储、数据库在内的一整套云计算服务。结合AWS的强大计算能力和Kylin的数据分析能力,企业可以加速数据分析过程,提升数据挖掘能力。
实施过程
-
准备AWS服务资源
• 创建一个AWS账号,并配置必要的权限。
• 了解与Amazon EMR集群相关的AWS服务资源,如VPC(Virtual Private Cloud)、EC2(Elastic Compute Cloud)和S3(Simple Storage Service)。
-
创建Amazon EMR集群
• 在AWS控制台中选择EMR服务,点击“创建集群”。
• 配置集群参数,包括选择EMR版本(如emr-5.21.0或更高版本,以确保支持Apache Kylin)、实例类型、数量以及网络设置等。
• 勾选Apache Kylin运行必需的服务组件,如Hadoop、HBase、Hive等。
-
在EMR集群上安装Kylin
• 登录到EMR集群的主节点。
• 下载并解压Apache Kylin安装包。
• 配置Kylin的环境变量和kylin.properties文件。
• 替换必要的Jar包,以确保Kylin与EMR集群中的其他服务组件兼容。
-
配置Kylin数据源和Cube
• 将数据存储在AWS的S3或HDFS中,并使用Hive进行预处理和清洗。
• 在Kylin中定义数据源,指向存储在S3或HDFS中的数据。
• 创建Cube,定义维度和度量,以及分区策略。
-
构建和查询Cube
• 配置Cube构建任务,定期从数据源中提取数据并加载到Kylin中进行预计算。
• 使用Kylin的Web界面或REST API进行查询,享受亚秒级的查询响应时间。
结果
通过在AWS的EMR集群上部署Apache Kylin,企业可以实现以下效益:
• 加速数据分析:Kylin的预计算机制显著减少了实时查询的计算量,提高了查询速度。
• 降低成本:利用AWS的按需付费和弹性扩展特性,企业可以根据实际需求灵活调整资源使用,降低IT投入成本。
• 提高系统稳定性:Kylin的分布式架构和高可用性设计确保了系统在高并发查询下的稳定运行。
示例代码
以下是一个在AWS EMR上创建Kylin Cube的示例代码:
CREATE CUBE my_cube
DIMENSIONS (
dimension1,
dimension2
)
MEASURES (
SUM(measure1),
COUNT(measure2)
)
PARTITIONED BY (partition_date);
此代码创建了一个名为my_cube的Cube,包含了两个维度dimension1和dimension2,以及两个度量SUM(measure1)和COUNT(measure2)。数据按partition_date进行分区。
以下是在AWS EMR上部署Apache Kylin并实现数据分析的具体流程与关键Python代码实现:
一、AWS EMR集群创建(Python自动化)
使用boto3
库自动化创建EMR集群:
import boto3
def create_emr_cluster():
emr = boto3.client('emr', region_name='us-west-2')
response = emr.run_job_flow(
Name='Kylin-EMR-Cluster',
ReleaseLabel='emr-6.8.0', # 确保支持Kylin
Applications=[
{'Name': 'Hadoop'},
{'Name': 'Hive'},
{'Name': 'HBase'}
],
Instances={
'InstanceGroups': [
{
'Name': 'MasterNode',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
},
{
'Name': 'CoreNodes',
'Market': 'SPOT', # 使用Spot实例降低成本
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 2,
}
],
'Ec2KeyName': 'your-key-pair',
'KeepJobFlowAliveWhenNoSteps': True,
'Ec2SubnetId': 'subnet-xxxxxx'
},
BootstrapActions=[
{
'Name': 'Install-Kylin',
'ScriptBootstrapAction': {
'Path': 's3://your-bucket/install-kylin.sh' # 引导脚本自动安装Kylin
}
}
],
ServiceRole='EMR_DefaultRole',
JobFlowRole='EMR_EC2_DefaultRole'
)
return response['JobFlowId']
# 执行创建
cluster_id = create_emr_cluster()
print(f"Cluster created with ID: {cluster_id}")
二、Kylin安装引导脚本(install-kylin.sh)
#!/bin/bash
# 下载并解压Kylin
wget https://archive.apache.org/dist/kylin/apache-kylin-3.1.2/apache-kylin-3.1.2-bin-hbase1x.tar.gz
tar -xzf apache-kylin-3.1.2-bin-hbase1x.tar.gz -C /opt/
mv /opt/apache-kylin-3.1.2-bin-hbase1x /opt/kylin
# 配置环境变量
echo 'export KYLIN_HOME=/opt/kylin' >> /etc/profile
echo 'export PATH=$KYLIN_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
# 替换HBase兼容性JAR(根据EMR版本调整)
cp /usr/lib/hbase/lib/*.jar /opt/kylin/ext/
# 启动Kylin服务
kylin.sh start
三、Hive表创建(指向S3数据)
使用pyhive
连接Hive并定义外部表:
from pyhive import hive
conn = hive.Connection(host='emr-master-node-ip', port=10000)
cursor = conn.cursor()
# 创建外部表指向S3数据
cursor.execute('''
CREATE EXTERNAL TABLE IF NOT EXISTS sales_data (
transaction_id STRING,
product_id STRING,
sale_amount DOUBLE,
transaction_date DATE
)
STORED AS PARQUET
LOCATION 's3://your-bucket/sales-data/'
''')
print("Hive table created successfully.")
四、Kylin Cube创建(REST API调用)
使用requests
调用Kylin API创建Cube:
import requests
import json
kylin_url = 'http://<emr-master-ip>:7070/kylin/api'
headers = {'Content-Type': 'application/json', 'Authorization': 'Basic YWRtaW46S1lMSU4='} # 默认admin/KYLIN
# 1. 创建项目
project_payload = {"name": "Sales_Project"}
requests.post(f'{kylin_url}/projects', headers=headers, data=json.dumps(project_payload))
# 2. 创建数据模型
model_payload = {
"name": "sales_model",
"project": "Sales_Project",
"fact_table": "SALES_DATA",
"lookups": [],
"dimensions": [
{"table": "SALES_DATA", "column": "PRODUCT_ID"},
{"table": "SALES_DATA", "column": "TRANSACTION_DATE"}
],
"metrics": ["SUM(SALE_AMOUNT)", "COUNT(TRANSACTION_ID)"],
"partition_desc": {"partition_date_column": "TRANSACTION_DATE"}
}
requests.post(f'{kylin_url}/models', headers=headers, data=json.dumps(model_payload))
# 3. 创建Cube
cube_payload = {
"name": "sales_cube",
"model_name": "sales_model",
"dimensions": [
{"name": "PRODUCT_ID", "table": "SALES_DATA", "column": "PRODUCT_ID"},
{"name": "TRANSACTION_DATE", "table": "SALES_DATA", "column": "TRANSACTION_DATE"}
],
"measures": [
{"name": "TOTAL_SALES", "function": {"expression": "SUM(SALE_AMOUNT)"}},
{"name": "TRANSACTION_COUNT", "function": {"expression": "COUNT(TRANSACTION_ID)"}}
],
"partition_date_start": "2023-01-01",
"auto_merge_time_ranges": [7, 30]
}
response = requests.post(f'{kylin_url}/cubes', headers=headers, data=json.dumps(cube_payload))
print("Cube创建状态:", response.status_code)
五、触发Cube构建与查询
# 触发Cube构建
build_payload = {
"startTime": "2023-01-01",
"endTime": "2023-12-31",
"buildType": "BUILD"
}
requests.put(f'{kylin_url}/cubes/sales_cube/build', headers=headers, data=json.dumps(build_payload))
# 执行SQL查询
query = """
SELECT PRODUCT_ID, SUM(SALE_AMOUNT)
FROM SALES_DATA
WHERE TRANSACTION_DATE BETWEEN '2023-01-01' AND '2023-12-31'
GROUP BY PRODUCT_ID
"""
result = requests.post(f'{kylin_url}/query', headers=headers, data=json.dumps({"sql": query}))
print("查询结果:", result.json())
关键要点说明
- 自动化部署:通过boto3和引导脚本实现EMR集群与Kylin的一键部署。
- 数据准备:Hive表直接映射S3数据,避免数据迁移。
- Cube优化:按日期分区和自动合并策略提升查询性能。
- 成本控制:使用Spot实例和EMR自动伸缩降低资源成本。
- 安全实践:在AWS中配置VPC和安全组限制访问来源IP。
实际部署时需替换代码中的占位符(如S3路径、EMR主节点IP),并根据数据规模调整EMR集群配置。