改进 任务调度

This commit is contained in:
iVampireSP.com 2023-02-13 02:21:09 +08:00
parent fafcf479ad
commit 3abbc86951
No known key found for this signature in database
GPG Key ID: 2F7B001CA27A8132
6 changed files with 173 additions and 173 deletions

View File

@ -22,44 +22,45 @@ class Kernel extends ConsoleKernel
* Define the application's command schedule. * Define the application's command schedule.
* *
* @param Schedule $schedule * @param Schedule $schedule
*
* @return void * @return void
*/ */
protected function schedule(Schedule $schedule): void protected function schedule(Schedule $schedule): void
{ {
// 清理过期的 Token // 清理过期的 Token
$schedule->command('sanctum:prune-expired --hours=24')->daily()->runInBackground(); $schedule->command('sanctum:prune-expired --hours=24')->daily()->runInBackground()->onOneServer()->name("清理过期的 Token。");
// 扣费 // 扣费
$schedule->job(new DispatchHostCostQueueJob(now()->minute))->everyMinute()->withoutOverlapping()->onOneServer(); $schedule->job(new DispatchHostCostQueueJob(now()->minute))->everyMinute()->withoutOverlapping()->onOneServer()->name("部署扣费任务");
// 获取模块暴露的信息(服务器等,检查模块状态) // 获取模块暴露的信息(服务器等,检查模块状态)
$schedule->job(new DispatchFetchModuleJob())->withoutOverlapping()->everyMinute(); $schedule->job(new DispatchFetchModuleJob())->withoutOverlapping()->everyMinute()->name("获取模块暴露的信息(服务器等,检查模块状态)");
// 推送工单 // 推送工单
$schedule->job(new PushWorkOrderJob())->everyMinute()->onOneServer(); $schedule->job(new PushWorkOrderJob())->everyMinute()->onOneServer()->name("推送工单");
// 自动关闭工单 // 自动关闭工单
$schedule->job(new AutoCloseWorkOrderJob())->everyMinute()->onOneServer(); $schedule->job(new AutoCloseWorkOrderJob())->everyMinute()->onOneServer()->name("自动关闭工单");
// 清理任务 // 清理任务
$schedule->job(new ClearTasksJob())->weekly()->onOneServer(); $schedule->job(new ClearTasksJob())->weekly()->onOneServer()->name("清理大于 1 天的任务");
// 删除暂停或部署时间超过 3 天以上的主机 // 删除暂停或部署时间超过 3 天以上的主机
$schedule->job(new DeleteHostJob())->hourly()->onOneServer(); $schedule->job(new DeleteHostJob())->hourly()->onOneServer()->name("删除暂停或部署时间超过 3 天以上的主机");
// 检查主机是否存在于模块 // 检查主机是否存在于模块
$schedule->job(new ScanAllHostsJob())->everyThirtyMinutes()->withoutOverlapping()->onOneServer(); $schedule->job(new ScanAllHostsJob())->everyThirtyMinutes()->withoutOverlapping()->onOneServer()->name("检查主机是否存在于模块");
// 检查未充值的订单,并充值 // 检查未充值的订单,并充值
$schedule->job(new CheckAndChargeBalanceJob())->everyFiveMinutes()->onOneServer()->withoutOverlapping(); $schedule->job(new CheckAndChargeBalanceJob())->everyFiveMinutes()->onOneServer()->withoutOverlapping()->name("检查未充值的订单,并充值");
// 发送模块收益 // 发送模块收益
$schedule->job(new SendModuleEarningsJob())->dailyAt('20:00')->onOneServer(); $schedule->job(new SendModuleEarningsJob())->dailyAt('20:00')->onOneServer()->name("发送模块收益");
// 回滚临时用户组 // 回滚临时用户组
$schedule->job(new RollbackUserTempGroupJob())->everyMinute()->onOneServer(); $schedule->job(new RollbackUserTempGroupJob())->everyMinute()->onOneServer()->name("回滚临时用户组");
// 设置生日用户组 // 设置生日用户组
$schedule->job(new SetBirthdayGroupJob())->dailyAt('00:00')->onOneServer(); $schedule->job(new SetBirthdayGroupJob())->dailyAt('00:00')->onOneServer()->name("设置生日用户组");
} }
/** /**

View File

@ -12,16 +12,20 @@ class DispatchHostCostQueueJob implements ShouldQueue
{ {
use InteractsWithQueue, Queueable, SerializesModels; use InteractsWithQueue, Queueable, SerializesModels;
public int $minute; protected int $minute;
protected ?Host $host;
/** /**
* Create a new job instance. * Create a new job instance.
* *
* @return void * @return void
*/ */
public function __construct($minute) public function __construct($minute, Host $host = null)
{ {
$this->minute = $minute; $this->minute = $minute;
$this->host = $host;
$this->onQueue('host-cost');
} }
/** /**
@ -31,6 +35,7 @@ public function __construct($minute)
*/ */
public function handle(): void public function handle(): void
{ {
if (!$this->host) {
$host = new Host(); $host = new Host();
if (app()->environment() != 'local') { if (app()->environment() != 'local') {
@ -39,8 +44,11 @@ public function handle(): void
$host->whereIn('status', ['running', 'stopped'])->with('user')->chunk(500, function ($hosts) { $host->whereIn('status', ['running', 'stopped'])->with('user')->chunk(500, function ($hosts) {
foreach ($hosts as $host) { foreach ($hosts as $host) {
dispatch(new RealHostCostJob($host, $host->getPrice()))->onQueue('host-cost'); dispatch(new self($this->minute, $host));
} }
}); });
} }
$this->host?->cost($this->host->getPrice());
}
} }

View File

@ -1,41 +0,0 @@
<?php
namespace App\Jobs\Host;
use App\Models\Host;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class RealHostCostJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public Host $host;
public string $price;
/**
* Create a new job instance.
*
* @param Host $host
* @param string $price
*/
public function __construct(Host $host, string $price)
{
$this->host = $host;
$this->price = $price;
}
/**
* Execute the job.
*
* @return void
*/
public function handle(): void
{
$this->host->cost($this->price);
}
}

View File

@ -0,0 +1,49 @@
<?php
namespace App\Jobs\Host;
use App\Models\Host;
use App\Notifications\User\UserNotification;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class SendRenewNotificationJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected ?Host $host;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(?Host $host)
{
$this->host = $host;
}
/**
* Execute the job.
*
* @return void
*/
public function handle(): void
{
if (!$this->host) {
// 获取 Host距离今天刚好 7 天的 Host
Host::where('next_due_at', '>', now()->addDays(7)->startOfDay())
->where('next_due_at', '<', now()->addDays(7)->endOfDay())
->chunk(100, function ($hosts) {
foreach ($hosts as $host) {
dispatch(new self($host));
}
});
}
$this->host?->user->notify(new UserNotification("续费提醒", "您的 {$this->host->name} 将在 7 天后到期,请及时续费。", true));
}
}

View File

@ -3,15 +3,31 @@
namespace App\Jobs\Module; namespace App\Jobs\Module;
use App\Models\Module; use App\Models\Module;
use Exception;
use Illuminate\Bus\Queueable; use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels; use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Log;
class DispatchFetchModuleJob implements ShouldQueue class DispatchFetchModuleJob implements ShouldQueue
{ {
use InteractsWithQueue, Queueable, SerializesModels; use InteractsWithQueue, Queueable, SerializesModels;
protected ?Module $module;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(Module $module = null)
{
$this->module = $module;
}
/** /**
* Execute the job. * Execute the job.
* *
@ -19,10 +35,81 @@ class DispatchFetchModuleJob implements ShouldQueue
*/ */
public function handle(): void public function handle(): void
{ {
if (!$this->module) {
(new Module)->whereNotNull('url')->chunk(100, function ($modules) { (new Module)->whereNotNull('url')->chunk(100, function ($modules) {
foreach ($modules as $module) { foreach ($modules as $module) {
dispatch(new FetchModuleJob($module)); dispatch(new self($module));
} }
}); });
} }
if ($this->module) {
$module = $this->module;
$servers = [];
try {
$response = $module->http()->get('remote');
} catch (Exception $e) {
Log::debug('无法连接到模块 - down: ' . $e->getMessage());
// 如果模块状态不为 down则更新为 down
if ($module->status !== 'down') {
$module->status = 'down';
$module->save();
}
return;
}
if ($response->successful()) {
// 如果模块状态不为 up则更新为 up
if ($module->status !== 'up') {
$module->status = 'up';
Log::debug('模块状态更新为 up: ' . $module->name);
}
$json = $response->json();
if (isset($json['servers']) && is_array($json['servers'])) {
// 只保留 name, status, meta
$servers = array_merge($servers, array_map(function ($server) use ($module) {
return [
'name' => $server['name'],
'status' => $server['status'],
'meta' => $server['meta'] ?? [],
'created_at' => $server['created_at'] ?? now(),
'updated_at' => $server['updated_at'] ?? now(),
'module' => [
'id' => $module->id,
'name' => $module->name,
],
];
}, $json['servers']));
// broadcast(new Servers($servers));
}
} else {
// if module return maintenance, then set module status to maintenance
$status = $response->status();
if ($status == 503 || $status == 429 || $status == 502) {
$module->status = 'maintenance';
} else {
$module->status = 'down';
}
Log::debug('模块状态更新为 ' . $module->status . ': ' . $module->name);
}
$module->save();
// if local
if (config('app.env') === 'local') {
Cache::forever('module:' . $module->id . ':servers', $servers);
} else {
Cache::put('module:' . $module->id . ':servers', $servers, now()->addMinutes(10));
}
}
}
} }

View File

@ -1,104 +0,0 @@
<?php
namespace App\Jobs\Module;
use App\Models\Module;
use Exception;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Log;
class FetchModuleJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected Module $module;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(Module $module)
{
$this->module = $module;
}
/**
* Execute the job.
*
* @return void
*/
public function handle(): void
{
$module = $this->module;
$servers = [];
try {
$response = $module->http()->get('remote');
} catch (Exception $e) {
Log::debug('无法连接到模块 - down: '.$e->getMessage());
// 如果模块状态不为 down则更新为 down
if ($module->status !== 'down') {
$module->status = 'down';
$module->save();
}
return;
}
if ($response->successful()) {
// 如果模块状态不为 up则更新为 up
if ($module->status !== 'up') {
$module->status = 'up';
Log::debug('模块状态更新为 up: '.$module->name);
}
$json = $response->json();
if (isset($json['servers']) && is_array($json['servers'])) {
// 只保留 name, status, meta
$servers = array_merge($servers, array_map(function ($server) use ($module) {
return [
'name' => $server['name'],
'status' => $server['status'],
'meta' => $server['meta'] ?? [],
'created_at' => $server['created_at'] ?? now(),
'updated_at' => $server['updated_at'] ?? now(),
'module' => [
'id' => $module->id,
'name' => $module->name,
],
];
}, $json['servers']));
// broadcast(new Servers($servers));
}
} else {
// if module return maintenance, then set module status to maintenance
$status = $response->status();
if ($status == 503 || $status == 429 || $status == 502) {
$module->status = 'maintenance';
} else {
$module->status = 'down';
}
Log::debug('模块状态更新为 '.$module->status.': '.$module->name);
}
$module->save();
// if local
if (config('app.env') === 'local') {
Cache::forever('module:'.$module->id.':servers', $servers);
} else {
Cache::put('module:'.$module->id.':servers', $servers, now()->addMinutes(10));
}
}
}