SonarQube服务端任务执行源码分析

SonarQube 服务端CE报告分析执行源码分析

一、前言

近期在对SonarQube源码进行改造时,遇到了一些问题,为更有效的解决这些问题,有必要对SonarQube的执行源码进入一定深入的分析,特测记录

CE 是ComputeEngine的简写,SonarQube 社区版的服务启动后实际上是有三个服务的:

  • web server
  • ce
  • search server

CE节点主要职能是将本地客户端分析源码后通过web api 提交上的分析报告进行进一步加工(上个版本、分支、PR差异),其中涉及几个关键点:

  • CeWorker
  • CeTask
  • CeQueue

CeWorker是干活的人,CeTask则是要干的活,同时怎么干也在CeTask上写好了,CeQueue是目前在排队的任务,下面就依次对这三部分代码展开具体的分析

二、Worker启动核心代码

下面从最底层的CeWorker接口,回溯woker线程启动的前世今生

CeWorker Interface

负责轮询CeQueue并执行CeTaskrunnable的标记接口。Callable#call()返回一个布尔值,当处理某个 CeTask 时为状态为true,否则为false

CeWorkerImpl

实现了CeWoker接口,由CeWorkerFactory.create方法实例化

其中tryAndFindTaskToExecute 方法,查询数据库中现有未执行任务并执行处理

1
2
3
4
5
6
7
8
9
private Optional<CeTask> tryAndFindTaskToExecute() {
excludeIndexationJob = !excludeIndexationJob;
try {
return queue.peek(uuid, excludeIndexationJob);
} catch (Exception e) {
LOG.error("Failed to pop the queue of analysis reports", e);
}
return Optional.empty();
}

CeWorkerFactory

创建ceWorker的工厂

CeProcessingSchedulerImpl

一年之计:管理CE进程的工作计划

CeProcessingScheduler实例化时,会根据线程数配置调用ceWorkerFactory.create()方法创建出对应数量的worker出来

1
2
3
4
5
6
7
8
9
10
public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration,
//... ...
int threadWorkerCount = ceConfiguration.getWorkerMaxCount();
this.chainingCallbacks = new ChainingCallback[threadWorkerCount];
// 根据线程数配置创建worker 多一个worker 就可以同时多分析一份报告
for (int i = 0; i < threadWorkerCount; i++) {
CeWorker worker = ceCeWorkerFactory.create(i);
chainingCallbacks[i] = new ChainingCallback(worker);
}
}

然后再调用startScheduling方法,开始定时任务

1
2
3
4
5
6
7
@Override
public void startScheduling() {
for (ChainingCallback chainingCallback : chainingCallbacks) {
ListenableScheduledFuture<CeWorker.Result> future = executorService.schedule(chainingCallback.worker, delayBetweenEnabledTasks, timeUnit);
addCallback(future, chainingCallback, MoreExecutors.directExecutor());
}
}

CeQueueInitializer

搭箭上弦

清理队列,初始化JMX计数器,然后安排工作者的执行。

允许不阻止worker在队列准备好之前偷看队列。

1
2
3
4
5
private void initCe() {
ceDistributedInformation.broadcastWorkerUUIDs();
processingScheduler.startScheduling();
cleaningScheduler.startScheduling();
}

CeQueueInitialiter由

ComputeEngineContainerImpl

万事俱备

ComputeEngineContainerImpl的start方法内调用 startupTasks();

serverLifecycleNotifier.notifyStart()通过onServerStart方法再调用CeQueueInitializer.initCe()

CeDistributedInformationImpl

为SonarQube的集群实例中提供CeWoker的UUID信息,底层是基于HazelCast提供的java接口的分布式实现

StandaloneCeDistributedInformation

为SonarQube的集群实例中提供CeWoker的UUID信息

HazelcastMemberBuilder

负责hazelcast 实例创建

三、数据流

开启postgresql 的 SQL 日志,执行一次源码分析操作,就可以从PG的日志中看到任务提交后数据流转的过程

开启PG的SQL语句日志可以通过log_statement参数,该参数控制哪些 SQL 语句被记录,可选的有效值有 none (off)、ddlmodall(所有语句)

log_statement各参数说明:

ddl记录所有数据定义语句,例如CREATE、ALTER和 DROP语句。

mod记录所有ddl语句,外加数据修改语句例如INSERT, UPDATE、DELETE、TRUNCATE, 和COPY FROM。 如果PREPARE、EXECUTE和 EXPLAIN ANALYZE包含合适类型的命令,它们也会被记录。对于使用扩展查询协议的客户端,当收到一个执行消息时会产生日志并且会包括绑定参数的值(任何内嵌的单引号会被双写)

all 记录所有sql 包括select语句

我们需要看到数据插入、更新、删除的过程,mod模式刚好适合,执行以下SQL修改:

1
2
3
alter system set log_statement= 'mod'; 
SELECT pg_reload_conf();
show log_statement;

开启之后,可以在PG的日志文件中查看SQL执行记录,首先我们会看到很多这样的日志:

1
2
3
4
5
6
7
8
9
2021-06-12 22:18:26.646 CST [17162] LOG:  execute S_6: 
update ce_queue set
status='PENDING',
updated_at=$1
where
status <> 'PENDING'
and worker_uuid = $2
2021-06-12 22:18:26.646 CST [17162] DETAIL:
parameters: $1 = '1623507506646', $2 = 'AXoAZ-WbCnCI8SGCfcIy'

这是空闲的CeWoker在扫描任务队列中,将属于它的任务状态修改为PENDING(猜测任务状态可能有其他情况,需要被重置回来),由于该SQL执行频率很高, 对我们的分析工作会造成一定干扰,在真正执行CE任务前,我们先把试试能不能SonarQube服务的CE进程kill掉

在运行SonarQube的容器内,执行ps -ef | grep CeServer 找到CE进程,结果如下:

1
2
sonar@8b5e36d897c5:/opt/sonarqube$ ps -ef | grep CeServer
sonar 75 1 0 13:27 ? 00:00:36 /usr/local/openjdk-11/bin/java -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djava.io.tmpdir=/opt/sonarqube/temp -XX:-OmitStackTraceInFastThrow --add-opens=java.base/java.util=ALL-UNNAMED -Xmx512m -Xms128m -XX:+HeapDumpOnOutOfMemoryError -javaagent:/opt/sonarqube/extensions/plugins/sonarqube-community-branch-plugin-1.8.0.jar=ce -Dhttp.nonProxyHosts=localhost|127.*|[::1] -cp ./lib/sonar-application-8.9-SNAPSHOT.jar:/opt/sonarqube/lib/jdbc/postgresql/postgresql-42.2.19.jar org.sonar.ce.app.CeServer /opt/sonarqube/temp/sq-process5197976280920372167properties

很遗憾,kill掉进程后,容器直接挂掉了,既然如此,那就直接过滤日志吧

还可以通过修改源代码删除掉CE进行的启动部分的代码来实现这个效果,不过相对比较麻烦,后面还要再改回来,就不用这种方式了,具体如下,注释掉tryToStartCe方法的调用即可,有兴趣可以自己改一下:

server/sonar-main/src/main/java/org/sonar/application/SchedulerImpl.java`

1
2
3
4
5
6
> private void tryToStartAll() throws InterruptedException {
> tryToStartEs();
> tryToStartWeb();
> // tryToStartCe();
> }
>

突然,我在一堆相似的日志中发现一条与众不同的记录:

1
2
3
4
5
6
7
8
9
10
11
UPDATE ce_queue 
SET status = 'PENDING',
worker_uuid = NULL,
updated_at =$1
WHERE
status = 'IN_PROGRESS'
AND (
worker_uuid IS NULL
OR
worker_uuid NOT IN ( $2, $3, $4, $5, $6, $7, $8, $9, $10,$11 )
)

筛选后,可以看到它的频率大概两分钟一次

image-20210614191011521

此前,曾遇到部署多节点后,部分任务被莫名取消,现在来看,很可能就只因为这条SQL导致其他节点的分析任务worker_uuid被重置为null,cewoker进程分析完成后通过wokerid+taskid找不到数据导致

image-20210614203740989

对应具体代码为:

1
2
3
4
5
6
7
8
private void resetTasksWithUnknownWorkerUUIDs() {
try {
LOG.trace("Resetting state of tasks with unknown worker UUIDs");
internalCeQueue.resetTasksWithUnknownWorkerUUIDs(ceDistributedInformation.getWorkerUUIDs());
} catch (Exception e) {
LOG.warn("Failed to reset tasks with unknown worker UUIDs", e);
}
}

参考资料:PostgreSQL 日志参数log_statement

1
2


三、底层实现相关知识

ListeningExecutorService