springboot第83集:理解SaaS多租户应用的架构和设计,设备介入,网关设备,安全,实时实现,序列化...
springboot第83集:理解SaaS多租户应用的架构和设计,设备介入,网关设备,安全,实时实现,序列化,数据交换,存储与查询,流处理,消息队列
[什么是多租户]
聊到PaaS,SaaS,就不得不谈到多租户。
多租户指一套系统能够支撑多个租户。一个租户通常是具有相似访问模式和权限的一组用户,典型的租户是同一个组织或者公司的若干用户。
要实现多租户,首先需要考虑的是数据层面的多租户。数据层的多租户模型对上层服务和应用的多租户实现有突出影响。本文重点介绍数据层多租户对各种多租户模型的支持。
权衡不同的多租户实现方式时,需要考虑如下因素:
扩展性:租户数量级别,以及未来发展趋势
安全性:租户之间数据隔离级别要求
资源共享:多租户通常有某种形式的资源共享,需要避免某个租户的糟糕SQL吃掉系统资源,影响其他租户的响应时间
灵活性:不同租户可能有不同的需求,对特定租户需求的扩展能力
跨租户分析和优化:对全部租户或者多个租户的数据和行为进行分析的能力
运维和管理:运维管理的复杂度和便宜性,包括监控、修改数据库模式、创建索引、收集统计数据、数据加载等
成本:总体拥有成本,包括方案实现成本、运维成本等
[多租户模型]
多租户模型描述了租户和该租户的数据之间的映射关系。不同的多租户模型会影响数据库和应用程序的设计、管理和维护。
[一租户一数据库]
最简单的多租户实现方式是为每一个租户创建一个数据库,如下图所示。应用程序为每个租户分配一个租户id,并为每个租户配置相应的数据库连接信息(包括数据库ip、端口等)。应用程序根据租户id连接到为其分配的数据库。
这种模型中不同租户的数据物理隔离,安全级别高。如果每个租户的数据库使用不同的硬件和数据库类型,则他们之间的资源使用也是物理隔离的;如果租户的数据库共用同一套硬件,则需要对资源进行合理分配和管理,避免相互影响。由于不同租户使用独立的数据库,灵活性好,容易满足不同租户的特定需求(譬如需要额外的字段)。出现故障时影响面小。缺点是数据库数量大,维护复杂,拥有成本高。适合租户数目比较少的场景。
[一租户一名字空间(Schema/Namespace)]
多个租户共享同一个数据库,每个租户拥有独立的名字空间(或模式)。应用程序为每个租户分配一个id,并把每个租户的所有操作限制在为其分配的名字空间/模式之中。如下图所示。
这种多租户模型下,不同租户的数据逻辑上相互隔离,安全控制相对简单。不同租户有不同的模式,可以简便的满足不同租户的特定需求,灵活性高。对资源管理能力要求高,以避免不同租户竞争资源。可以把不同租户的数据存储在不同的磁盘上,降低了对磁盘IO的竞争。运维和管理较复杂,不易实现大量租户的跨租户分析。适合租户数目适中的场景。
[全共享方式]
不同租户共享同一个数据库、同一个名字空间。不同租户的数据在同一组表中共存,通过租户id标记和访问不同租户的数据(应用需要调整访问数据的SQL以包含租户id)。
这种多租户模型中,不同租户的数据物理存储在一起,对系统的资源隔离和安全隔离要求很高。运维相对简单。扩展能力好,可以支持较多数量租户。由于租户数据存储在一起,跨租户数据分析和优化非常简单。成本低,可以较低的代价支持更多的租户。
全共享模型中,很多数据库采用添加大量自定义字段的方式满足不同租户的特定需求,以提高灵活性。这种方式有诸多局限性,譬如字段数目不能太多、管理复杂等。支持更多半结构化数据,包括JSON 等,通过这种半结构化数据,可以更灵活、高效、便捷的满足不同租户的特定需求。
[无限可能的MQTT]
发送命令远程控制:
读取和发布数据:
[组成和基本概念]
MQTT是有以下几部分组成:
发布(Publish)/订阅(Subscribe)
消息(Message)
主题(Topics)
代理(Broker)
[发布(Publish)/订阅(Subscribe)]
(代理)Broker有三个主要作用:
接受所有的消息
过滤消息
发布消息到所有订阅的客户端
Spring Boot 基本已经一统 Java 项目的开发,大量的开源项目都实现了其的 Starter 启动器。例如:
incubator-dubbo-spring-boot-project
启动器,可以快速配置 Dubbo 。rocketmq-spring-boot-starter
启动器,可以快速配置 RocketMQ 。
Modbus是一种串行通讯协议,是Modicon公司(现在的施耐德电气 Schneider Electric) 于1979年为使用可编程逻辑控制器(PLC)通信而发表。Modbus已经成为工业领域通信协议事实上的业界标准,并且现在是工业电子设备之间常见的连接方式。
Modbus在工业环境下很流行,因为它是公开发布而免版税的。它是为工业应用开发的,与其他标准相比,它相对易于部署和维护,除了要传输的数据格式的大小外,几乎没有其他限制。Modbus使用RS485作为其物理层。
Modbus支持连接到同一网络的许多设备之间进行通信,例如,一个测量温度和湿度并将结果发送给服务器的系统中,Modbus通常用于在监控和数据采集(SCADA)系统中将计算机或服务器与远程终端单元(RTU)连接。许多数据类型是根据梯形逻辑(一种通过基于继电器逻辑电路图的图形来代表程序的一种编程语言)的行业用法机及其在驱动继电器中的用途来命名的: 单位物理输出称为线圈,单位物理输入称为离散输入或触点。
[Modbus协议及其物理媒体]
Modbus是描述消息通信对话框的开放标准。
Modbus通过多种类型的物理介质进行通信,例如:
串行RS-232
串行RS-485
串行RS-422
以太网
最初的Modbus接口在RS-232串行通信上运行,但是大多数后来的Modbus实现使用RS-485,因为它允许:
距离更长。
更高的速度。
单个多点网络中可能有多个设备。
由Infinite Automation Systems和Serotonin Software用Java编写的Modbus协议的高性能和易用性实现。支持ASCII,RTU,TCP和UDP传输作为从属或主用,自动请求分区和响应数据类型解析。
[RPC框架原理]
RPC 框架的目标就是让远程服务调用更加简单、透明,RPC 框架负责屏蔽底层的传输方式(TCP 或者 UDP)、序列化方式(XML/Json/ 二进制)和通信细节。服务调用者可以像调用本地接口一样调用远程的服务提供者,而不需要关心底层通信细节和调用过程。
RPC 框架的调用原理图如下所示:
Broker: 消息处理中心,负责消息的接受、存储、转发等。
Producer: 消息生产者,负责产生和发送消息和消息处理中心。
Consumer: 消息消费者,负责从消息处理中心获取消息,并进行相应的处理。
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
public final class InMemoryStorage {
//保存消息数据的容器,<topic,消息阻塞队列> 键值对
private final ConcurrentHashMap<String, BlockingQueque<QueueMsg>> storage;
private static InMemoryStorage instance;
private InMemoryStorage() {
storage = new ConcurrentHashMap<>();
}
//利用双重检查加锁(double-checked locking),首先检查是否示例已经创建了,如果尚未创建,"才"进行同步。这样以来,只有第一次会同步,这正是我们想要的。
public static InMemoryStorage getInstance() {
if (instance == null) {
synchronized (InMemoryStorage.class) {
if (instance == null) {
instance = new InMemoryStorage();
}
}
}
return instance;
}
//保存消息到主题中,若topic对应的value为空,会将第二个参数的返回值存入并返回
public boolean put(String topic, QueueMsg msg) {
return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingDeque<>()).add(msg);
}
//获得主题中的消息
public <T extends QueueMsg> List<T> get(String topic) {
//判断map中是否包含此topic
if (storage.containsKey(topic)) {
List<T> entities;
//从此主题对应的阻塞队列中出队一个元素
T first = (T) storage.get(topic).poll();
if (first != null) {
entities = new ArrayList<>();
entities.add(first);
List<QueueMsg> otherList = new ArrayList<>();
//移动阻塞队列中最大999个元素到arrayList中
storage.get(topic).drainTo(otherList, 999);
for (QueueMsg other : otherList) {
entities.add((T) other);
}
} else {
entities = Collections.emptyList();
}
}
return Collections.emptyList();
}
//删除此map中所有的键值对
public void cleanup() {
storage.clear();
}
}
作为一个消息处理中心中,至少要有一个数据容器用来保存接受到的消息。
[消息格式定义]
队列消息接口定义(QueueMsg)
public interface QueueMsg {
//消息键
String getKey();
//消息头
QueueMsgHeaders getHeaders();
//消息负载byte数组
byte[] getData();
}
队列消息头接口定义(QueueMsgHeaders)
import java.util.Map;
public interface QueueMsgHeaders {
//消息头放入
byte[] put(String key, byte[] value);
//消息头通过key获取byte数组
byte[] get(String key);
//消息头数据全部读取方法
Map<String, byte[]> getData();
}
队列消息格式(ProtoQueueMsg)
public class ProtoQueueMsg implements QueueMsg {
private final String key;
private final String value;
private final QueueMsgHeaders headers;
public ProtoQueueMsg(String key, String value) {
this(key, value, new DefaultQueueMsgHeaders());
}
public ProtoQueueMsg(String key, String value, QueueMsgHeaders headers) {
this.key = key;
this.value = value;
this.headers = headers;
}
@Override
public String getKey() {
return key;
}
@Override
public QueueMsgHeaders getHeaders() {
return headers;
}
@Override
public byte[] getData() {
return value.getBytes();
}
}
默认队列消息头(DefaultQueueMsgHeaders)
import java.util.HashMap;
import java.util.Map;
protected final Map<String, byte[]> data = new HashMap<>();
@Override
public byte[] put(String key, byte[] value) {
return data.put(key, value);
}
@Override
public byte[] get(String key) {
return data.get(key);
}
@Override
public Map<String, byte[]> getData() {
return data;
}
}
[消息生产者]
import iot.technology.mqtt.storage.msg.QueueMsg;
import iot.technology.mqtt.storage.queue.QueueCallback;
public class Producer<T extends QueueMsg> {
private final InMemoryStorage storage = InMemoryStorage.getInstance();
private final String defaultTopic;
public Producer(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
public void send(String topicName, T msg) {
boolean result = storage.put(topicName, msg);
}
}
[消息消费者]
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@Slf4j
public class Consumer<T extends QueueMsg> {
private final InMemoryStorage storage = InMemoryStorage.getInstance();
private volatile Set<String> topics;
private volatile boolean stopped;
private volatile boolean subscribed;
private final String topic;
//虚构函数
public Consumer(String topic) {
this.topic = topic;
stopped = false;
}
public String getTopic() {
return topic;
}
public void subscribe() {
topics = Collections.singleton(topic);
subscribed = true;
}
//批量订阅主题
public void subscribe(Set<String> topics) {
this.topics = topics;
subscribed = true;
}
public void unsubscribe() {
stopped = true;
}
//不断读取topic集合下阻塞队列中的数据集合
public List<T> poll(long durationInMillis) {
if (subscribed) {
List<T> messages = topics
.stream()
.map(storage::get)
.flatMap(List::stream)
.map(msg -> (T) msg).collect(Collectors.toList());
if (messages.size() > 0) {
return messages;
}
try {
Thread.sleep(durationInMillis);
} catch (InterruptedException e) {
if (!stopped) {
log.error("Failed to sleep.", e);
}
}
}
return Collections.emptyList();
}
}
内存型消息队列
加群联系作者vx:xiaoda0423
仓库地址:https://github.com/webVueBlog/JavaGuideInterview