建设银行网站怎么修改手机号码吗,南京网站seo服务,可以做淘宝推广的网站,西部虚拟主机网站后台不能访问一、 背景与目标监控最主要还是上报到Prometheus#xff0c;可惜成本实在是高昂#xff0c;特别是存储时间要求得越久#xff0c;我们这里探索了下micrometer转换成json#xff0c;然后存储到starrocks这类mpp olap引擎#xff0c;通过starrocks的存算分离架构#xff0c…一、 背景与目标监控最主要还是上报到Prometheus可惜成本实在是高昂特别是存储时间要求得越久我们这里探索了下micrometer转换成json然后存储到starrocks这类mpp olap引擎通过starrocks的存算分离架构降低成本。二、 总体架构Spring Boot Micrometer --指标采集-- Kafka --消息流-- Flink --清洗/聚合/分流-- StarRocks (metric_data / metric_series)三、Spring Boot MeterRegister 发送至kafka在此之前我们上报Prometheus的时候一般都是再Spring Boot中配置micrometer-registry-prometheus即可这种方式主要是Pull模式暴露/actuator/prometheus的指标然后由Prometheus抓取。同时还有一种Step模式即定义一个周期T周期结束时相关指标清零。特性 StepDelta模式 Cumulative累计模式值含义 当前时间窗的增量 从启动至今的总和是否清零 是周期切换后清零 否场景 Push 型采集系统 Pull 型采集系统数据延迟影响 延迟会导致丢失当前周期数据 延迟只影响分辨率不丢数据计算速率方式 注册表已按步长计算 采集端函数计算如果采用累计模式存储到数据库里则是不断增加的值如果要计算某一段分钟或者小时的值则需要使用窗口函数对数据库压力巨大。相比之下直接在采集的时候就做好差分对数据库压力很小故Step步长模式更适合我们。3.1 代码样例先是定义export的配置也可以做成通用的spring-boot-starter给别的服务用。DataComponentConfigurationProperties(prefix management.metrics.export.kafka)public class KafkaMeterRegistryConfig implements StepRegistryConfig {private String topic default_topic;private Duration step Duration.ofSeconds(30);OverrideNonNull public String prefix() {return management.metrics.export.kafka;}Overridepublic String get(NotNull String key) {return null;}OverrideNonNullpublic Duration step() {return step;}}其次通过扩展定义您自己的计量表注册表StepMeterRegistry如下所示public class KafkaMeterRegistry extends StepMeterRegistry {private final KafkaTemplateString, String kafkaTemplate;private final String topic;public KafkaMeterRegistry(KafkaMeterRegistryConfig config,Clock clock,KafkaTemplateString, String kafkaTemplate) {super(config, clock);this.kafkaTemplate kafkaTemplate;this.topic config.getTopic();start(new NamedThreadFactory(kafka-metrics-publisher));log.info(KafkaMeterRegistry created);}Overrideprotected void publish() {// 每个step结束后该方法被调用long ts System.currentTimeMillis();for (Meter meter : this.getMeters()) {//关键字窗口切换//不要在 publish() 里重复调用同一个 meter.measure() 多次否则第一次调用就清零了后面会拿到零值。//不要依赖 publish() 之后还去别处用同一个 StepMeterRegistry 的 Meter 做跨周期累计因为 step 运行就是为时间窗统计设计的跨周期值会被清零。for (Measurement ms : meter.measure()) {...准备数据kafkaTemplate.send(topic, jsonString);}}}NotNullOverrideprotected TimeUnit getBaseTimeUnit() {return TimeUnit.MILLISECONDS; // 发送数据时间单位}}最后创建注册表配置和计量表注册表相关的BEAN。如果您使用 Spring Boot可以按如下方式操作Configurationpublic class MetricsConfig {Beanpublic KafkaMeterRegistry customMeterRegistry(KafkaMeterRegistryConfig kafkaMeterRegistryConfig,Clock clock,KafkaTemplateString, String kafkaTemplate) {return new KafkaMeterRegistry(kafkaMeterRegistryConfig, clock, kafkaTemplate);}}3.2 底层如何实现Step模式目前核心疑问点在于是如何清零的是否有丢数据的可能在 Micrometer 中步长周期Step Interval的处理逻辑主要由 StepMeterRegistry 完成这类 Registry例如 DatadogMeterRegistry、AtlasMeterRegistry会确保每个时间窗口的指标值是该窗口的增量数据而不是自应用启动以来的累计值。Pasted image 20251126164809┌─ StepMeterRegistry.publish() ← 定时任务│├─ meter.measure() ← 对 StepCounter 调用 measure│├─ StepCounter.poll()│ ├─ double v count.get()│ ├─ count.set(0.0) ← 清零│ ├─ lastStepTime.set(now) ← 标记窗口开始时间│ └─ 返回当前窗口的值给 publish简单的说就是StepMeterRegistry 定时按 step 配置调用 publish()。publish() 会遍历所有的 meter包括 Counter。对 StepCounter 而言poll() 会返回当前窗口增量并清零。下一个步骤周期继续从 0 开始累加。因此清零是在采集周期触发其条件是now - lastStepTime stepMillis处理完后发送到kafka就是类似{ts: 2024-06-01 12:00:00.000,name: http_server_requests_total_time,env: prod,labels: { uri: /api/orders, method: POST },value: 3.14159}四、 存储方案对比我们以一串json为例{metricName: cpu_usage,labels: {host: server-01,region: us-east},timestamp: 2025-10-15T08:01:12.123Z,value: 0.73}直接可以对第一层key建表metricName、timestamp、value都可以直接约定为列目前问题的核心主要有两个1labels如何存储2针对Histogram、Summary、Gauge的上报如何处理。我们先来解决第一个问题labels如何存储。4.1 方案一字符串拼接如果维度比较少的情况下确实可以考虑通过字符串截取来实现比如上面的存储为server-01|us-east通过split竖线来获取响应的字段例如SPLIT_PART(server-01|us-east, |, 1) AS host_value,SPLIT_PART(server-01|us-east, |, 2) AS region_value这种如果labels多的情况下需要自行约定字符串的顺序而且数据量的情况下SPLIT_PART性能消耗也比较大。4.2 方案二指标表 标签维表 倒排索引倒排索引Micrometer采集Kafka队列Flink ETLStarRocks metric_dataStarRocks metric_series标签快速检索这种主要是想把指标和labels拆出来放到不同的两个表里同时使用一个唯一id关联为一批数据一开始有点豁然开朗后来一想1维度如果很多但是只查一个维度的数据这种关联不准确2如果要关联查询特别是维度多的情况下sql复杂度翻好几倍维护性大大降低。4.3 方案三Flat JSON 存储考虑了一下简单粗暴的方法labels直接存储为JSON列刚好看到starrocks的新特性Flat JSONFlat JSON的核心原理是在导入时检测JSON数据并从JSON数据中提取常用字段作为标准类型数据存储。在查询JSON时这些常用字段优化了JSON的查询速度刚好符合我们的条件。CREATE TABLE monitor_data(ts DATETIME COMMENT 指标时间,name STRING COMMENT 指标名称,env STRING COMMENT 环境,dc STRING COMMENT 数据中心,biz_id BIGINT COMMENT 业务 ID,labels JSON COMMENT 指标标签JSON 格式,value DOUBLE COMMENT 指标值)ENGINE OLAP DUPLICATE KEY(ts,name,env,dc,biz_id)COMMENT 监控PARTITION BY date_trunc(day, ts)DISTRIBUTED BY HASH(name,env,dc,biz_id)ORDER BY (name,ts)PROPERTIES (compression LZ4,flat_json.enable true,fast_schema_evolution false,partition_live_number 366,replicated_storage true,replication_num 3);五、Flink入库starrocks是可以直接通过jdbc方式入库的但是JDBC 适合低并发、小批量容易出现性能问题对于复杂多表、多源、字段更新场景Kafka Flink StarRocks 是最安全高效的方案。具体也不展开讲解这个配合下公司基建来即可。六、入库后样例写入数据后展示ts name env dc biz_id labels value2024-06-01 12:00:00 http_requests_count prod eu 0 3.14159最近 5 分钟各接口请求数以 http_requests_count 为例按 method/status 维度聚合SELECTJSON_VALUE(labels, $.method) AS method,JSON_VALUE(labels, $.status) AS status,SUM(value) AS total_requestsFROM monitor_dataWHERE name http_requests_countAND ts NOW() - INTERVAL 5 MINUTEAND env prodGROUP BY method, statusORDER BY total_requests DESC;