基本集群处理

This commit is contained in:
iVampireSP.com 2023-01-04 23:57:36 +08:00
parent 9bd0f11e16
commit 0a97078a83
No known key found for this signature in database
GPG Key ID: 2F7B001CA27A8132
8 changed files with 209 additions and 49 deletions

View File

@ -2,6 +2,7 @@
namespace App\Console\Commands\Cluster;
use App\Support\Cluster;
use Illuminate\Console\Command;
use Symfony\Component\Console\Command\Command as CommandAlias;
@ -28,6 +29,37 @@ class Log extends Command
*/
public function handle(): int
{
Cluster::listen('*', function ($event, $message) {
$this->format($event, $message);
}, false);
return CommandAlias::SUCCESS;
}
private function format(string $event, array $message = [])
{
$status = $this->switch($event);
if (!$status) {
return;
}
$message = "[{$message['node']['type']}] {$message['node']['id']}:{$event}: " . $status;
$this->info($message);
}
public function switch($event): string|null
{
$events = [
'node.ok' => '此节点初始化成功,并且已经加入集群。',
'node.online' => '此节点已经上线。',
'node.offline' => '将不再处理任何任务。',
'cluster_ready.ok' => 'Cluster Ready 就绪了,已经可以处理请求了。',
'config.updated' => '集群配置文件已经更新,请所有 slave 节点下载。',
'config.synced' => '我已下载配置文件。',
];
return $events[$event] ?? null;
}
}

View File

@ -1,33 +0,0 @@
<?php
namespace App\Console\Commands\Cluster;
use Illuminate\Console\Command;
use Symfony\Component\Console\Command\Command as CommandAlias;
class ReportSystem extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'cluster:report';
/**
* The console command description.
*
* @var string
*/
protected $description = '报告此系统';
/**
* Execute the console command.
*
* @return int
*/
public function handle(): int
{
return CommandAlias::SUCCESS;
}
}

View File

@ -16,7 +16,7 @@ class Sync extends Command
*
* @var string
*/
protected $signature = 'cluster:sync';
protected $signature = 'cluster:sync {--force=false}';
/**
* The console command description.
@ -37,20 +37,28 @@ public function handle(): int
$node_type = config('settings.node.type');
if ($node_type === 'master') {
$confirm = $this->ask('主节点同步将会恢复上一次数据,确定吗?', 'yes');
if ($confirm !== 'yes') {
// if force is true, delete the old file
if ($this->option('force') === 'true') {
$confirm = 'yes';
} else {
$confirm = $this->ask('主节点同步将会恢复上一次数据,确定吗?', true);
}
if (!$confirm) {
$this->warn('已取消。');
return CommandAlias::SUCCESS;
}
}
$cache_key = "master_config";
$cache_key = "master_config_zip";
$config = Cluster::get($cache_key);
if ($config) {
$this->info('检查下载目录的 MD5 值。');
$config_md5_key = "master_config_md5";
$config_md5_key = "master_config_zip_md5";
$config_md5 = Cluster::get($config_md5_key, '');
$md5 = md5($config);
@ -113,6 +121,9 @@ public function handle(): int
// 刷新配置
$this->info('正在刷新配置。');
Artisan::call('optimize');
// 上报消息
Cluster::publish('synced');
} else {
$this->error('没有找到缓存。请尝试从其他节点重新同步。');
return CommandAlias::FAILURE;

View File

@ -68,6 +68,8 @@ public function handle(): int
unlink(base_path('.env.temp'));
}
Cluster::publish('config.updated');
$this->info('节点初始化完成。');
if (app()->environment() === 'local') {
@ -112,12 +114,12 @@ public function upload($node_type)
$this->info('正在上传 config 目录。');
$cache_key = "${node_type}_config";
$cache_key = "${node_type}_config_zip";
Cluster::forever($cache_key, file_get_contents($cacheZip));
// md5
$this->info('正在报告 cache 目录的 MD5 值。');
$cache_md5_key = "${node_type}_config_md5";
$cache_md5_key = "${node_type}_config_zip_md5";
Cluster::forever($cache_md5_key, md5_file($cacheZip));
unlink($cacheZip);

View File

@ -2,7 +2,9 @@
namespace App\Console\Commands\Cluster;
use App\Support\Cluster;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Artisan;
use Symfony\Component\Console\Command\Command as CommandAlias;
class Work extends Command
@ -28,6 +30,107 @@ class Work extends Command
*/
public function handle(): int
{
$this->warn('正在初始化集群协调任务。');
Artisan::call('init');
Artisan::call('optimize');
$this->info('正在启动集群协调任务。');
$pid = pcntl_fork();
if ($pid === -1) {
$this->error('无法创建子进程。');
return CommandAlias::FAILURE;
} else if ($pid === 0) {
// 子进程
$this->report();
} else {
// 父进程
$this->work();
}
return CommandAlias::SUCCESS;
}
private function work(): void
{
$this->info('正在监听任务。');
Cluster::publish('node.online');
Cluster::listen('*', function ($event, $message) {
$this->dispatchEvent($event, $message);
}, false);
}
private function dispatchEvent($event, $message = []): void
{
$events = [
'config.updated' => function () {
$this->info('正在更新配置文件。');
Artisan::call('cluster:sync', [
'--force' => 'true',
]);
$this->info('配置文件更新完成。');
}
];
if (isset($events[$event])) {
$this->warn("正在处理 {$event} 事件。");
$events[$event]($message);
}
}
private function report(): void
{
$this->info('正在报告此系统,请保持此命令一直运行。');
$cpu = $this->getCpuUsage();
$memory = $this->getMemoryUsage();
while (1) {
Cluster::publish('system_usage', [
'cpu' => $cpu,
'memory' => $memory,
]);
sleep(1);
}
}
private function getCpuUsage(): float
{
// 获取 CPU 使用率
$cpu = sys_getloadavg();
return $cpu[0];
}
private function getMemoryUsage(): float
{
// 检查 free 命令是否存在
if (exec('which free')) {
$free = exec('free');
} else {
// fake free
$free = <<<EOF
total used free shared buff/cache available
Mem: 1982 334 1121 126 527 1380
Swap: 0 0 0
EOF;
}
$free = trim($free);
$free_arr = explode("\n", $free);
$mem = explode(" ", $free_arr[1]);
$mem = array_filter($mem);
$mem = array_merge($mem);
return round($mem[2] / $mem[1] * 100, 2);
}
}

View File

@ -2,7 +2,6 @@
namespace App\Support;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Redis;
class Cluster
@ -28,6 +27,7 @@ public static function isCluster(): bool
public static function publish($event, $data = []): void
{
Redis::publish('cluster_ready', json_encode([
'event' => $event,
'node' => [
'type' => config('settings.node.type'),
'id' => config('settings.node.id'),
@ -37,30 +37,61 @@ public static function publish($event, $data = []): void
]));
}
/**
* @param string|array $events 事件名称
* @param $callback callable 回调函数,接收一个参数,为事件数据。
* @param $ignore_self bool 是否忽略此节点的消息。
*
* @return void
*/
public static function listen(string|array $events, callable $callback, bool $ignore_self = true): void
{
// socket timeout
ini_set('default_socket_timeout', -1);
Redis::subscribe('cluster_ready', function ($message) use ($events, $callback, $ignore_self) {
$message = json_decode($message, true);
if ($ignore_self && $message['node']['id'] === config('settings.node.id')) {
return;
}
if (is_array($events)) {
if (in_array($message['event'], $events)) {
$callback($message['event'], $message);
}
} else {
if ($events === '*' || $events === $message['event']) {
$callback($message['event'], $message);
}
}
});
}
public static function hset($key, $value, $data = []): void
{
Redis::hset($key, $value, json_encode($data));
Redis::hset(self::$prefix . $key, $value, json_encode($data));
}
public static function get($key, $default = null): string|array|null
{
return Cache::get(self::$prefix . $key, $default);
return Redis::get(self::$prefix . $key, $default);
}
public static function set($key, $value, $ttl = null): void
{
Cache::put(self::$prefix . $key, $value, $ttl);
Redis::set(self::$prefix . $key, $value, $ttl);
}
public static function forget($key): void
{
Cache::forget(self::$prefix . $key);
Redis::forget(self::$prefix . $key);
}
// forever
public static function forever($key, $value): void
{
Cache::forever(self::$prefix . $key, $value);
self::set($key, $value, -1);
}
public static function hget($key, $value, $default = null): string|array|null
@ -68,7 +99,6 @@ public static function hget($key, $value, $default = null): string|array|null
return Redis::hget($key, $value, $default);
}
public static function registerThisNode(): void
{
$node_id = config('settings.node.id');
@ -79,6 +109,6 @@ public static function registerThisNode(): void
'ip' => config('settings.node.ip'),
]);
Cluster::publish('node_init');
Cluster::publish('node.ok');
}
}

View File

@ -26,7 +26,9 @@
"simplesoftwareio/simple-qrcode": "^4.2",
"spiral/roadrunner": "^2.8.2",
"symfony/psr-http-message-bridge": "^2.1",
"yansongda/laravel-pay": "~3.2.0"
"yansongda/laravel-pay": "~3.2.0",
"ext-pcntl": "*",
"ext-posix": "*"
},
"require-dev": {
"beyondcode/laravel-query-detector": "^1.6",

View File

@ -0,0 +1,13 @@
[program:lae-worker]
process_name=%(program_name)s_%(process_num)02d
directory=/opt/lae
command=php artisan cluster:work
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=www
numprocs=1
redirect_stderr=true
stdout_logfile=/var/log/lae-cluster.log
stopwaitsecs=3600