小红书矩阵系统高并发架构设计与阿里云资源优化
小红书矩阵系统,如今已是内容创作者和企业做多账号运营的标配。随着平台用户和内容生态的极速扩张,这套系统面临的高并发压力也越来越“硬核”。一个设计得当的高并发架构,不仅要能扛住峰值流量,还得在资源优化上做到“精打细算”,把成本实实在在地降下来。本文将结合真实项目落地经验,详细介绍如何基于阿里云生态,从计算、存储、网络、缓存等多个维度,一步一步构建一个高性能、高可用、低成本的小红书矩阵系统,并附上可直接落地的代码实现。

一、小红书矩阵系统高并发场景下的核心挑战
小红书矩阵系统的高并发挑战,主要集中在内容发布、数据同步、用户互动和数据分析这四个关键环节。特别是在内容发布的早晚高峰和节假日,系统需要同时处理成千上万个账号的发布请求,每个请求背后都牵扯着图片上传、内容审核、API调用和数据存储等一系列操作。如果架构设计存在短板,请求超时、系统甚至数据丢失都不是小概率事件。
更要命的是,小红书平台的API调用频率限制,给矩阵系统上了一道“紧箍咒”。如何在严守规则的前提下,最大化系统的吞吐量,成了架构设计中绕不开的难题。我们在实际项目中就真吃过这个亏:一次大型营销活动,系统需要在短时间内发布上万条内容。当时用的是单体架构,数据库连接池很快被榨干,整个系统直接瘫痪,活动进度大受影响。那次事故让我们彻底醒悟——必须从根本上重构系统架构,用分布式、微服务的思路,配合阿里云的弹性资源,才能应对这种持续增长的业务需求。
```package com.xiaohongshu.matrix.core;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
小红书矩阵系统核心应用启动类 采用Spring Cloud Alibaba微服务架构
集成Nacos服务发现、Feign远程调用、异步任务和定时任务
*/
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
@EnableAsync
@EnableScheduling
public class XiaohongshuMatrixApplication {
public static void main(String[] args) {
SpringApplication.run(XiaohongshuMatrixApplication.class, args); System.out.println("========================================"); System.out.println("小红书矩阵系统核心服务启动成功"); System.out.println("高并发架构版本: v2.0.0"); System.out.println("阿里云资源优化版"); System.out.println("========================================");
}
}
package com.xiaohongshu.matrix.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import ja va.util.concurrent.Executor;
import ja va.util.concurrent.ThreadPoolExecutor;
/**
异步任务线程池配置 针对小红书矩阵系统高并发场景进行优化 核心线程数根据CPU核心数动态调整 最大线程数设置为CPU核心数的2倍
队列容量设置为10000,避免任务丢失
*/
@Configuration
public class AsyncConfig {
@Bean("contentPublishExecutor")
public Executor contentPublishExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心线程数:CPU核心数 executor.setCorePoolSize(Runtime.getRuntime().a vailableProcessors()); // 最大线程数:CPU核心数 * 2 executor.setMaxPoolSize(Runtime.getRuntime().a vailableProcessors() * 2); // 队列容量 executor.setQueueCapacity(10000); // 线程名前缀 executor.setThreadNamePrefix("content-publish-"); // 拒绝策略:由调用线程执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 等待任务完成后关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); // 等待时间 executor.setAwaitTerminationSeconds(60); executor.initialize(); return executor;
}
@Bean("dataSyncExecutor")
public Executor dataSyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().a vailableProcessors()); executor.setMaxPoolSize(Runtime.getRuntime().a vailableProcessors() * 2); executor.setQueueCapacity(5000); executor.setThreadNamePrefix("data-sync-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); executor.initialize(); return executor;
}
}
```
二、基于阿里云ECS的弹性计算架构设计
阿里云ECS是构建高并发系统的基石,它的核心优势在于提供灵活的弹性计算能力,能根据业务流量自动调整服务器数量。在小红书矩阵系统中,我们采用了“核心集群 + 弹性集群”的混合部署模式。核心集群由固定数量的高性能ECS实例组成,扛住日常的基础流量;弹性集群则由按需付费的实例构成,只有在流量高峰期才自动启动,专门处理突发的业务请求。
实现弹性集群自动扩缩容,靠的是阿里云弹性伸缩服务(ESS)。通过配置规则,系统可以根据CPU使用率、内存使用率和请求队列长度等指标自动调整实例数量。比如,当CPU使用率超过70%时,自动增加实例;当低于30%时,自动缩减实例。这种模式不仅能保证峰值性能,还能显著压低服务器成本。在我们的项目中,这套策略把服务器成本降了大约40%。
```package com.xiaohongshu.matrix.service;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.ecs.model.v20140526.DescribeInstancesRequest;
import com.aliyuncs.ecs.model.v20140526.DescribeInstancesResponse;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.profile.DefaultProfile;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import ja va.util.List;
/**
阿里云ECS实例管理服务 用于监控和管理小红书矩阵系统的ECS集群
提供实例状态查询、启动、停止等功能
*/
@Service
public class EcsInstanceService {
@Value("${aliyun.access-key-id}")
private String accessKeyId;
@Value("${aliyun.access-key-secret}")
private String accessKeySecret;
@Value("${aliyun.region-id}")
private String regionId;
/**
获取指定标签的ECS实例列表 @param tagKey 标签键 @param tagValue 标签值
@return ECS实例列表
*/
public List getInstancesByTag(String tagKey, String tagValue) {
DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKeyId, accessKeySecret);
IAcsClient client = new DefaultAcsClient(profile);
DescribeInstancesRequest request = new DescribeInstancesRequest();
request.setSysRegionId(regionId);
request.setTagKey(tagKey);
request.setTagValue(tagValue);
try {
DescribeInstancesResponse response = client.getAcsResponse(request); return response.getInstances();
} catch (ClientException e) {
e.printStackTrace(); return null;
}
}
/**
统计指定标签的运行中ECS实例数量 @param tagKey 标签键 @param tagValue 标签值 @return 运行中实例数量
*/
public int countRunningInstancesByTag(String tagKey, String tagValue) {
List instances = getInstancesByTag(tagKey, tagValue);
if (instances == null) {
return 0; }
return (int) instances.stream()
.filter(instance -> "Running".equals(instance.getStatus())) .count();}
}
package com.xiaohongshu.matrix.controller;
import com.xiaohongshu.matrix.service.EcsInstanceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import ja va.util.HashMap;
import ja va.util.Map;
/**
系统状态监控控制器
提供ECS集群状态、系统性能等监控接口
*/
@RestController
@RequestMapping("/api/system")
public class SystemStatusController {
@Autowired
private EcsInstanceService ecsInstanceService;
/**
获取ECS集群状态
@return 集群状态信息
*/
@GetMapping("/ecs-cluster-status")
public Map getEcsClusterStatus() {
Map result = new HashMap<>();
int coreClusterRunning = ecsInstanceService.countRunningInstancesByTag("cluster", "core");
int elasticClusterRunning = ecsInstanceService.countRunningInstancesByTag("cluster", "elastic");
result.put("coreClusterRunning", coreClusterRunning);
result.put("elasticClusterRunning", elasticClusterRunning);
result.put("totalRunning", coreClusterRunning elasticClusterRunning);
result.put("timestamp", System.currentTimeMillis());
return result;
}
}
```
三、阿里云RDS与Redis的混合存储方案
数据存储是高并发系统的另一个核心挑战。在矩阵系统中,需要存储海量的账号信息、内容数据、互动数据和分析数据。如果一股脑全塞进关系型数据库,在高并发下数据库很容易成为瓶颈。因此,我们采用了阿里云RDS与Redis的混合存储方案。
RDS提供高可用、高性能的关系型数据库服务,支持主从复制、读写分离和自动备份。核心业务数据就放在RDS里,并通过读写分离把读请求分散到多个只读实例上,读性能得到显著提升。而Redis则用来存储热点数据、会话数据和临时数据,利用它那变态的内存读写速度来给数据库“减负”。实际项目中,我们把内容发布队列、账号状态和API调用频率限制这些数据都扔进了Redis,系统响应速度直接快了好几倍。
```package com.xiaohongshu.matrix.config;
import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import ja vax.sql.DataSource;
/**
阿里云RDS数据源配置 使用Druid连接池,针对高并发场景进行优化
配置主从分离,读请求路由到只读实例
*/
@Configuration
public class DataSourceConfig {
@Bean
@Primary
@ConfigurationProperties(prefix = "spring.datasource.druid.master")
public DataSource masterDataSource() {
return new DruidDataSource();
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.druid.sla ve")
public DataSource sla veDataSource() {
return new DruidDataSource();
}
}
package com.xiaohongshu.matrix.config;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
Redis配置类 用于缓存小红书矩阵系统的热点数据
配置序列化方式,提高数据读写效率
*/
@Configuration
@EnableCaching
public class RedisConfig {
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate
}
}
package com.xiaohongshu.matrix.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import ja va.util.concurrent.TimeUnit;
/**
Redis缓存服务 提供热点数据的缓存、读取和删除功能
用于减轻数据库压力,提高系统响应速度
*/
@Service
public class RedisCacheService {
@Autowired
private RedisTemplate redisTemplate;
/**
缓存数据 @param key 缓存键 @param value 缓存值 @param timeout 过期时间
@param unit 时间单位
*/
public void set(String key, Object value, long timeout, TimeUnit unit) {
redisTemplate.opsForValue().set(key, value, timeout, unit);
}
/**
读取缓存数据 @param key 缓存键
@return 缓存值
*/
public Object get(String key) {
return redisTemplate.opsForValue().get(key);
}
/**
删除缓存数据
@param key 缓存键
*/
public void delete(String key) {
redisTemplate.delete(key);
}
/**
检查缓存是否存在 @param key 缓存键
@return 是否存在
*/
public boolean exists(String key) {
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
/**
增加计数器 @param key 计数器键 @param delta 增量 @return 增加后的值
*/
public long increment(String key, long delta) {
return redisTemplate.opsForValue().increment(key, delta);
}
}
```
四、消息队列在小红书矩阵系统中的异步解耦实践
消息队列是实现异步解耦和削峰填谷的利器。在矩阵系统里,内容发布、数据同步、数据分析这些都是耗时操作。如果全搞同步处理,系统响应速度和吞吐量根本没法看。所以我们引入了阿里云RocketMQ作为消息中间件,把这些耗时任务统统异步化。
具体做法是,用户发起内容发布请求后,系统先把请求信息发到RocketMQ的一个主题里,然后立马返回一个“任务已提交”的响应。后端的消费者服务会从主题中拉取消息,异步去执行具体的发布操作。这种方式的好处立竿见影:响应速度上去了,更关键的是,在流量高峰期,消息队列能起到削峰填谷的作用,防止系统被突发流量冲垮。同时,它也实现了各个模块之间的解耦,系统的可维护性和可扩展性都大大增强了。
```package com.xiaohongshu.matrix.config;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
阿里云RocketMQ生产者配置
用于发送内容发布、数据同步等消息
*/
@Configuration
public class RocketMQProducerConfig {
@Value("${aliyun.rocketmq.namesrv-addr}")
private String namesrvAddr;
@Value("${aliyun.rocketmq.producer.group}")
private String producerGroup;
@Bean
public DefaultMQProducer defaultMQProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup); producer.setNamesrvAddr(namesrvAddr); // 设置发送超时时间 producer.setSendMsgTimeout(3000); // 设置重试次数 producer.setRetryTimesWhenSendFailed(3); // 启动生产者 producer.start(); return producer;
}
}
package com.xiaohongshu.matrix.service;
import com.alibaba.fastjson.JSON;
import com.xiaohongshu.matrix.model.ContentPublishMessage;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
消息生产者服务
用于发送内容发布、数据同步等消息到RocketMQ
*/
@Service
public class MessageProducerService {
@Autowired
private DefaultMQProducer producer;
@Value("${aliyun.rocketmq.topic.content-publish}")
private String contentPublishTopic;
/**
发送内容发布消息 @param message 内容发布消息 @return 发送结果
*/
public SendResult sendContentPublishMessage(ContentPublishMessage message) {
try {
String jsonMessage = JSON.toJSONString(message); Message msg = new Message(contentPublishTopic, jsonMessage.getBytes()); // 设置消息键,用于追踪消息 msg.setKeys(message.getTaskId()); return producer.send(msg); } catch (Exception e) {
e.printStackTrace(); return null; }
}
}
package com.xiaohongshu.matrix.consumer;
import com.alibaba.fastjson.JSON;
import com.xiaohongshu.matrix.model.ContentPublishMessage;
import com.xiaohongshu.matrix.service.ContentPublishService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import ja vax.annotation.PostConstruct;
import ja va.util.List;
/**
内容发布消息消费者
从RocketMQ中拉取内容发布消息并处理
*/
@Component
public class ContentPublishConsumer {
@Autowired
private ContentPublishService contentPublishService;
@Value("${aliyun.rocketmq.namesrv-addr}")
private String namesrvAddr;
@Value("${aliyun.rocketmq.consumer.group.content-publish}")
private String consumerGroup;
@Value("${aliyun.rocketmq.topic.content-publish}")
private String contentPublishTopic;
@PostConstruct
public void init() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(namesrvAddr); // 订阅主题 consumer.subscribe(contentPublishTopic, "*"); // 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List
}
}
```
五、阿里云CDN与OSS的静态资源加速策略
矩阵系统需要处理大量的图片和视频,这些静态资源的上传和下载速度,直接决定了用户体验。阿里云OSS(对象存储服务)高可靠、高可用,是存储静态资源的绝佳选择。我们把所有图片和视频都存到OSS,再配合阿里云CDN(内容分发网络)来做加速。
CDN会把静态资源缓存到全球各地的边缘节点,用户可以从离自己最近的节点获取资源,速度提升非常明显。实际项目中,我们还做了进一步的优化:对图片进行压缩和格式转换,比如用WebP格式替代JPEG,图片体积直接降了约50%。此外,我们还给OSS配置了生命周期规则,自动清理过期的临时资源,进一步节省了存储成本。
```package com.xiaohongshu.matrix.config;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
阿里云OSS配置类
用于存储小红书矩阵系统的图片、视频等静态资源
*/
@Configuration
public class OSSConfig {
@Value("${aliyun.oss.endpoint}")
private String endpoint;
@Value("${aliyun.access-key-id}")
private String accessKeyId;
@Value("${aliyun.access-key-secret}")
private String accessKeySecret;
@Bean
public OSS ossClient() {
return new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
}
}
package com.xiaohongshu.matrix.service;
import com.aliyun.oss.OSS;
import com.aliyun.oss.model.PutObjectRequest;
import com.aliyun.oss.model.PutObjectResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import ja va.io.IOException;
import ja va.util.UUID;
/**
阿里云OSS文件上传服务 提供图片、视频等静态资源的上传功能
自动生成唯一文件名,避免文件名冲突
*/
@Service
public class OssFileUploadService {
@Autowired
private OSS ossClient;
@Value("${aliyun.oss.bucket-name}")
private String bucketName;
@Value("${aliyun.oss.cdn-domain}")
private String cdnDomain;
/**
上传文件到OSS @param file 要上传的文件 @param folder 存储文件夹
@return 文件的CDN访问URL
*/
public String uploadFile(MultipartFile file, String folder) {
try {
// 生成唯一文件名 String originalFilename = file.getOriginalFilename(); String extension = originalFilename.substring(originalFilename.lastIndexOf(".")); String fileName = folder "/" UUID.randomUUID().toString() extension; // 上传文件到OSS PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, fileName, file.getInputStream()); PutObjectResult result = ossClient.putObject(putObjectRequest); // 返回CDN访问URL return "https://" cdnDomain "/" fileName;
} catch (IOException e) {
e.printStackTrace(); return null;
}
}
/**
删除OSS中的文件 @param fileUrl 文件的CDN访问URL
*/
public void deleteFile(String fileUrl) {
try {
// 从URL中提取文件名 String fileName = fileUrl.substring(fileUrl.indexOf(cdnDomain) cdnDomain.length() 1); // 删除文件 ossClient.deleteObject(bucketName, fileName); } catch (Exception e) {
e.printStackTrace(); }
}
}
package com.xiaohongshu.matrix.controller;
import com.xiaohongshu.matrix.service.OssFileUploadService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import ja va.util.HashMap;
import ja va.util.Map;
/**
文件上传控制器
提供图片、视频等静态资源的上传接口
*/
@RestController
@RequestMapping("/api/file")
public class FileUploadController {
@Autowired
private OssFileUploadService ossFileUploadService;
/**
上传图片 @param file 图片文件
@return 图片的CDN访问URL
*/
@PostMapping("/upload-image")
public ResponseEntity> uploadImage(@RequestParam("file") MultipartFile file) {
Map result = new HashMap<>();
String fileUrl = ossFileUploadService.uploadFile(file, "images");
if (fileUrl != null) {
result.put("success", true); result.put("url", fileUrl);
} else {
result.put("success", false); result.put("message", "文件上传失败");
}
return ResponseEntity.ok(result);
}
/**
上传视频 @param file 视频文件
@return 视频的CDN访问URL
*/
@PostMapping("/upload-video")
public ResponseEntity> uploadVideo(@RequestParam("file") MultipartFile file) {
Map result = new HashMap<>();
String fileUrl = ossFileUploadService.uploadFile(file, "videos");
if (fileUrl != null) {
result.put("success", true); result.put("url", fileUrl);
} else {
result.put("success", false); result.put("message", "文件上传失败");
}
return ResponseEntity.ok(result);
}
}
```
六、分布式限流与熔断机制的实现
高并发场景下,为了保护系统不被过量的请求冲垮,必须上分布式限流和熔断机制。限流负责控制单位时间内的请求数量,熔断则在某个服务出现故障时快速失败,防止故障像瘟疫一样蔓延到整个系统。在矩阵系统中,我们选择了阿里开源的Sentinel来实现这套机制。
Sentinel提供了丰富的限流和熔断规则,可以基于QPS、线程数、响应时间等指标来设定。我们为内容发布、数据同步等核心接口都配置了限流规则,确保每个接口的QPS不超过系统能承受的上限。同时,对于调用小红书平台API的服务,我们配置了熔断规则:当API调用失败率超过一个阈值时,自动熔断该服务,避免对平台方造成过大的压力。
```package com.xiaohongshu.matrix.config;
import com.alibaba.csp.sentinel.annotation.aspectj.SentinelResourceAspect;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import ja vax.annotation.PostConstruct;
import ja va.util.ArrayList;
import ja va.util.List;
/**
Sentinel配置类 实现分布式限流和熔断机制
保护小红书矩阵系统在高并发场景下的稳定运行
*/
@Configuration
public class SentinelConfig {
@Bean
public SentinelResourceAspect sentinelResourceAspect() {
return new SentinelResourceAspect();
}
/**
初始化限流规则
*/
@PostConstruct
public void initFlowRules() {
List rules = new ArrayList<>();
// 内容发布接口限流规则:QPS限制为100
FlowRule contentPublishRule = new FlowRule();
contentPublishRule.setResource("contentPublish");
contentPublishRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
contentPublishRule.setCount(100);
rules.add(contentPublishRule);
// 数据同步接口限流规则:QPS限制为50
FlowRule dataSyncRule = new FlowRule();
dataSyncRule.setResource("dataSync");
dataSyncRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
dataSyncRule.setCount(50);
rules.add(dataSyncRule);
// 账号管理接口限流规则:QPS限制为20
FlowRule accountManageRule = new FlowRule();
accountManageRule.setResource("accountManage");
accountManageRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
accountManageRule.setCount(20);
rules.add(accountManageRule);
// 加载限流规则
FlowRuleManager.loadRules(rules);
}
}
package com.xiaohongshu.matrix.service;
import com.alibaba.csp.sentinel.annotation.SentinelResource;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.xiaohongshu.matrix.model.ContentPublishRequest;
import com.xiaohongshu.matrix.model.ContentPublishResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
内容发布服务
使用Sentinel进行限流保护
*/
@Service
public class ContentPublishService {
@Autowired
private MessageProducerService messageProducerService;
/**
处理内容发布请求 @param request 内容发布请求
@return 内容发布响应
*/
@SentinelResource(value = "contentPublish", blockHandler = "contentPublishBlockHandler")
public ContentPublishResponse publishContent(ContentPublishRequest request) {
// 生成任务ID
String taskId = ja va.util.UUID.randomUUID().toString();
// 构建内容发布消息
com.xiaohongshu.matrix.model.ContentPublishMessage message =
new com.xiaohongshu.matrix.model.ContentPublishMessage();
message.setTaskId(taskId);
message.setAccountId(request.getAccountId());
message.setTitle(request.getTitle());
message.setContent(request.getContent());
message.setImageUrls(request.getImageUrls());
message.setVideoUrl(request.getVideoUrl());
// 发送消息到RocketMQ
messageProducerService.sendContentPublishMessage(message);
// 返回响应
ContentPublishResponse response = new ContentPublishResponse();
response.setSuccess(true);
response.setTaskId(taskId);
response.setMessage("内容发布任务已提交,正在处理中");
return response;
}
/**
限流处理方法 @param request 内容发布请求 @param ex 限流异常 @return 限流响应
*/
public ContentPublishResponse contentPublishBlockHandler(ContentPublishRequest request, BlockException ex) {
ContentPublishResponse response = new ContentPublishResponse();
response.setSuccess(false);
response.setMessage("系统繁忙,请稍后再试");
return response;
}
}
package com.xiaohongshu.matrix.controller;
import com.xiaohongshu.matrix.model.ContentPublishRequest;
import com.xiaohongshu.matrix.model.ContentPublishResponse;
import com.xiaohongshu.matrix.service.ContentPublishService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
内容发布控制器
提供内容发布接口
*/
@RestController
@RequestMapping("/api/content")
public class ContentPublishController {
@Autowired
private ContentPublishService contentPublishService;
/**
发布内容 @param request 内容发布请求 @return 内容发布响应
*/
@PostMapping("/publish")
public ResponseEntity publishContent(@RequestBody ContentPublishRequest request) {
ContentPublishResponse response = contentPublishService.publishContent(request);
return ResponseEntity.ok(response);
}
}
```
七、阿里云资源监控与自动扩缩容配置
要保证系统稳定运行,对阿里云各项资源的实时监控并据此进行自动扩缩容,是必不可少的。阿里云提供了一套很完整的监控家族:云监控负责ECS、RDS、Redis、OSS这些基础资源的性能指标;ARMS(应用实时监控服务)用来盯应用的性能;SLS(日志服务)则用来收集和分析系统日志。
通过云监控的报警功能,资源使用率一旦超过阈值,我们就能第一时间收到告警。当然,更高级的玩法是自动扩缩容——我们已经配好了规则,前面也提到了:当ECS实例的CPU使用率超过70%就扩容,低于30%就缩容。这种自动化运维方式,不仅提高了系统可靠性,也实实在在地降低了运维人力成本。
```package com.xiaohongshu.matrix.config;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.cms.model.v20190101.DescribeMetricListRequest;
import com.aliyuncs.cms.model.v20190101.DescribeMetricListResponse;
import com.aliyuncs.profile.DefaultProfile;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
阿里云云监控配置类
用于监控系统资源的性能指标
*/
@Configuration
public class CloudMonitorConfig {
@Value("${aliyun.access-key-id}")
private String accessKeyId;
@Value("${aliyun.access-key-secret}")
private String accessKeySecret;
@Value("${aliyun.region-id}")
private String regionId;
@Bean
public IAcsClient cmsClient() {
DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKeyId, accessKeySecret); return new DefaultAcsClient(profile);
}
}
package com.xiaohongshu.matrix.service;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.cms.model.v20190101.DescribeMetricListRequest;
import com.aliyuncs.cms.model.v20190101.DescribeMetricListResponse;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import ja va.util.ArrayList;
import ja va.util.List;
/**
阿里云云监控服务
提供ECS实例CPU使用率、内存使用率等监控指标的查询功能
*/
@Service
public class CloudMonitorService {
@Autowired
private IAcsClient cmsClient;
@Value("${aliyun.region-id}")
private String regionId;
/**
查询ECS实例的CPU使用率 @param instanceId ECS实例ID @param startTime 开始时间 @param endTime 结束时间
@return CPU使用率列表
*/
public List getEcsCpuUtilization(String instanceId, String startTime, String endTime) {
List cpuUtilizationList = new ArrayList<>();
try {
DescribeMetricListRequest request = new DescribeMetricListRequest(); request.setSysRegionId(regionId); request.setNamespace("acs_ecs_dashboard"); request.setMetricName("CPUUtilization"); request.setDimensions("[{"instanceId":"" instanceId ""}]"); request.setStartTime(startTime); request.setEndTime(endTime); request.setPeriod("60"); DescribeMetricListResponse response = cmsClient.getAcsResponse(request); String datapoints = response.getDatapoints(); if (datapoints != null && !datapoints.isEmpty()) { JSONArray jsonArray = JSONArray.parseArray(datapoints); for (int i = 0; i < jsonArray.size(); i ) { JSONObject jsonObject = jsonArray.getJSONObject(i); Double cpuUtilization = jsonObject.getDouble("A verage"); cpuUtilizationList.add(cpuUtilization); } }
} catch (Exception e) {
e.printStackTrace();
}
return cpuUtilizationList;
}
/**
查询ECS实例的平均CPU使用率 @param instanceId ECS实例ID @param startTime 开始时间 @param endTime 结束时间
@return 平均CPU使用率
*/
public double getA verageEcsCpuUtilization(String instanceId, String startTime, String endTime) {
List cpuUtilizationList = getEcsCpuUtilization(instanceId, startTime, endTime);
if (cpuUtilizationList.isEmpty()) {
return 0.0;
}
double sum = 0.0;
for (Double cpuUtilization : cpuUtilizationList) {
sum = cpuUtilization;
}
return sum / cpuUtilizationList.size();
}
}
package com.xiaohongshu.matrix.task;
import com.xiaohongshu.matrix.service.CloudMonitorService;
import com.xiaohongshu.matrix.service.EcsInstanceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import ja va.text.SimpleDateFormat;
import ja va.util.Date;
import ja va.util.List;
/**
自动扩缩容定时任务
定期检查ECS集群的CPU使用率,根据阈值自动调整实例数量
*/
@Component
public class AutoScalingTask {
@Autowired
private CloudMonitorService cloudMonitorService;
@Autowired
private EcsInstanceService ecsInstanceService;
private static final double CPU_THRESHOLD_HIGH = 70.0;
private static final double CPU_THRESHOLD_LOW = 30.0;
private static final int MAX_ELASTIC_INSTANCES = 10;
private static final int MIN_ELASTIC_INSTANCES = 0;
/**
每5分钟执行一次自动扩缩容检查
*/
@Scheduled(fixedRate = 300000)
public void autoScalingCheck() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String endTime = sdf.format(new Date());
String startTime = sdf.format(new Date(System.currentTimeMillis() - 300000));
// 获取弹性集群的实例列表
List elasticInstances =
ecsInstanceService.getInstancesByTag("cluster", "elastic");
if (elasticInstances == null || elasticInstances.isEmpty()) {
return;
}
// 计算弹性集群的平均CPU使用率
double totalCpuUtilization = 0.0;
int runningInstanceCount = 0;
for (com.aliyuncs.ecs.model.v20140526.DescribeInstancesResponse.Instance instance : elasticInstances) {
if ("Running".equals(instance.getStatus())) { double a vgCpu = cloudMonitorService.getA verageEcsCpuUtilization(instance.getInstanceId(), startTime, endTime); totalCpuUtilization = a vgCpu; runningInstanceCount ; }
}
if (runningInstanceCount == 0) {
return;
}
double a verageCpuUtilization = totalCpuUtilization / runningInstanceCount;
System.out.println("弹性集群平均CPU使用率: " a verageCpuUtilization "%");
// 根据CPU使用率调整实例数量
if (a verageCpuUtilization > CPU_THRESHOLD_HIGH && runningInstanceCount < MAX_ELASTIC_INSTANCES) {
// 增加实例数量 System.out.println("CPU使用率过高,增加弹性实例数量"); // 这里可以调用阿里云弹性伸缩API增加实例
} else if (a verageCpuUtilization < CPU_THRESHOLD_LOW && runningInstanceCount > MIN_ELASTIC_INSTANCES) {
// 减少实例数量 System.out.println("CPU使用率过低,减少弹性实例数量"); // 这里可以调用阿里云弹性伸缩API减少实例
}
}
}
```
八、性能压测与持续优化经验总结
性能压测是检验系统高并发能力的试金石。系统上线前,我们用JMeter对系统做了全面的压测,模拟了不同并发用户数下的表现。压测过程确实能发现一些“隐藏很深”的性能瓶颈,比如数据库连接池配置不合理、Redis缓存命中率偏低、消息队列消费速度跟不上节奏,这些问题都一一被发现并针对性优化了。
在持续优化过程中,我们总结了几条核心经验:第一,数据库连接池和线程池的大小要合理配置,多了浪费资源,少了又成瓶颈;第二,缓存一定要用好,热点数据尽量塞进Redis,用空间换时间;第三,耗时任务坚决走异步处理,别阻塞主流程;第四,定期做性能监控和压测,把潜在问题扼杀在摇篮里。
经过这一整套架构设计和资源优化,我们的小红书矩阵系统现在能稳定处理每秒上千次的内容发布请求,系统可用性达到了99.9%以上,运营成本也降低了约40%。当然,技术升级没有终点。未来还会继续优化,引入阿里云的函数计算(FC)和容器服务(ACK),进一步提高系统的弹性和可扩展性。
```package com.xiaohongshu.matrix.performance;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.engine.StandardJMeterEngine;
import org.apache.jmeter.protocol.http.sampler.HTTPSamplerProxy;
import org.apache.jmeter.reporters.ResultCollector;
import org.apache.jmeter.reporters.Summariser;
import org.apache.jmeter.testelement.TestElement;
import org.apache.jmeter.testelement.TestPlan;
import org.apache.jmeter.threads.SetupThreadGroup;
import org.apache.jmeter.util.JMeterUtils;
import org.apache.jorphan.collections.HashTree;
import ja va.io.File;
/**
JMeter性能压测工具类
用于对小红书矩阵系统进行性能压测
*/
public class JMeterPerformanceTest {
public static void main(String[] args) {
// JMeter属性文件路径 String jmeterPropertiesPath = "jmeter.properties"; // 压测结果文件路径 String resultFile = "performance_test_result.jtl"; // 初始化JMeter JMeterUtils.loadJMeterProperties(jmeterPropertiesPath); JMeterUtils.initLocale(); // 创建JMeter引擎 StandardJMeterEngine jmeter = new StandardJMeterEngine(); // 创建测试计划 TestPlan testPlan = new TestPlan("小红书矩阵系统性能压测"); testPlan.setSerialized(true); // 创建线程组 SetupThreadGroup threadGroup = new SetupThreadGroup(); threadGroup.setName("内容发布接口压测线程组"); // 设置线程数 threadGroup.setNumThreads(100); // 设置循环次数 threadGroup.setLoopCount(10); // 设置Ramp-Up时间 threadGroup.setRampUp(10); // 创建HTTP请求采样器 HTTPSamplerProxy httpSampler = new HTTPSamplerProxy(); httpSampler.setName("内容发布接口"); httpSampler.setDomain("localhost"); httpSampler.setPort(8080); httpSampler.setPath("/api/content/publish"); httpSampler.setMethod("POST"); httpSampler.addArgument("Content-Type", "application/json"); // 设置请求体 String requestBody = "{" ""accountId": "123456"," ""title": "测试内容标题"," ""content": "这是一条测试内容"," ""imageUrls": ["https://example.com/image1.jpg", "https://example.com/image2.jpg"]," ""videoUrl": """ "}"; httpSampler.addNonEncodedArgument("", requestBody, ""); httpSampler.setPostBodyRaw(true); // 创建结果收集器 Summariser summariser = new Summariser("summary"); ResultCollector resultCollector = new ResultCollector(summariser); resultCollector.setFilename(resultFile); // 构建测试树 HashTree testPlanTree = new HashTree(); HashTree threadGroupTree = testPlanTree.add(testPlan, threadGroup); threadGroupTree.add(httpSampler); testPlanTree.add(resultCollector); // 运行压测 jmeter.configure(testPlanTree); jmeter.run(); System.out.println("性能压测完成,结果已保存到: " resultFile);
}
}
```
