diff --git a/AGENTS.md b/AGENTS.md index 0e18b5f8..6f179043 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -36,7 +36,7 @@ api/lib/ ├── internals/ │ ├── controller/ # HTTP I/O(Echo)。DB アクセス禁止 │ └── repository/ # SQL 実行のみ。*sql.Rows を返す -└── externals/{db,server,slack} +└── externals/{db,server,slack,scheduler} # scheduler: 通知の定期実行 ticker ループ mobile/lib/ ├── pages/ # 画面(StatefulWidget) @@ -49,6 +49,12 @@ mobile/lib/ gas/{shift,task,user,rescue}/ # ドメイン別。コード.js / onChange.js 等 ``` +## 通知の定期実行 + +未送信の通知ログ(`action_logs` の `is_sent = false`)は、`externals/scheduler` の ticker ループが API プロセス内で5分間隔に `NotificationUseCase.ProcessUnsentNotifications` を呼び、Slack DM へ flush します(`di.go` で配線。`cmd/send-notifications` は手動 flush 用として併存)。 + +**API は単一インスタンス前提**です。複数レプリカで動かすと各プロセスの ticker が同じ未送信ログを拾って二重送信します。本番(`docker-compose.prod.yml`)は API を1レプリカで運用しているため現状は問題ありません。複数レプリカ化する場合はリーダー選出や排他制御が必要です。 + ## Code Style ### Go (`api/`) diff --git a/api/lib/di/di.go b/api/lib/di/di.go index b1955654..ceb71289 100755 --- a/api/lib/di/di.go +++ b/api/lib/di/di.go @@ -1,9 +1,13 @@ package di import ( - "log" + "context" + "time" + "github.com/NUTFes/SeeFT/api/lib/externals/db" + "github.com/NUTFes/SeeFT/api/lib/externals/scheduler" "github.com/NUTFes/SeeFT/api/lib/externals/server" + "github.com/NUTFes/SeeFT/api/lib/externals/slack" "github.com/NUTFes/SeeFT/api/lib/internals/controller" "github.com/NUTFes/SeeFT/api/lib/internals/repository" "github.com/NUTFes/SeeFT/api/lib/internals/repository/abstract" @@ -11,11 +15,11 @@ import ( "github.com/NUTFes/SeeFT/api/lib/usecase" ) -func InitializeServer() db.Client { +func InitializeServer(ctx context.Context) (db.Client, error) { // DB接続 client, err := db.ConnectMySQL() if err != nil { - log.Fatal("db error") + return nil, err } crud := abstract.NewCrud(client) @@ -93,8 +97,22 @@ func InitializeServer() db.Client { reviewController, ) + // Scheduler: 5分間隔で未送信通知を flush する(goroutine で起動し即 return) + slackService, err := slack.NewSlackService() + if err != nil { + return nil, err + } + notificationUseCase := usecase.NewNotificationUseCase( + actionLogRepository, slackService, + userRepository, dateRepository, timeRepository, + taskRepository, shiftRepository, weatherRepository, + ) + scheduler.New("notification", 5*time.Minute, notificationUseCase.ProcessUnsentNotifications).Start(ctx) + // Server - server.RunServer(router) + if err := server.RunServer(ctx, router); err != nil { + return client, err + } - return client + return client, nil } diff --git a/api/lib/externals/scheduler/scheduler.go b/api/lib/externals/scheduler/scheduler.go new file mode 100644 index 00000000..1e36c6bd --- /dev/null +++ b/api/lib/externals/scheduler/scheduler.go @@ -0,0 +1,48 @@ +package scheduler + +import ( + "context" + "log" + "time" +) + +// Job は scheduler が定期実行する処理。usecase を import せず関数型で受け取り、 +// scheduler と業務ロジックを疎結合に保つ。 +type Job func(ctx context.Context) error + +// Scheduler は「名前・間隔・実行する処理」を保持する。 +type Scheduler struct { + name string + interval time.Duration + job Job +} + +// New はコンストラクタでSchedulerを生成する。 +func New(name string, interval time.Duration, job Job) *Scheduler { + return &Scheduler{name: name, interval: interval, job: job} +} + +// Start は ticker ループを goroutine で起動し、即座に return する。 +// interval ごとに job を実行し、job が返したエラーはログ出力のみ(ループは止めない)。 +func (s *Scheduler) Start(ctx context.Context) { + go func() { + if err := s.job(ctx); err != nil { + log.Printf("[scheduler:%s] job error (initial): %v", s.name, err) + } + + ticker := time.NewTicker(s.interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if err := s.job(ctx); err != nil { + log.Printf("[scheduler:%s] job error: %v", s.name, err) + } + case <-ctx.Done(): + log.Printf("[scheduler:%s] stopped: %v", s.name, ctx.Err()) + return + } + + } + }() +} diff --git a/api/lib/externals/server/server.go b/api/lib/externals/server/server.go index e73cd8cc..78da850e 100755 --- a/api/lib/externals/server/server.go +++ b/api/lib/externals/server/server.go @@ -1,8 +1,10 @@ package server import ( + "context" "net/http" "os" + "time" _ "github.com/NUTFes/SeeFT/api/docs" "github.com/NUTFes/SeeFT/api/lib/router" @@ -11,7 +13,7 @@ import ( echoSwagger "github.com/swaggo/echo-swagger" ) -func RunServer(router router.Router) { +func RunServer(ctx context.Context, router router.Router) error { // echoのインスタンス e := echo.New() @@ -39,6 +41,25 @@ func RunServer(router router.Router) { // swagger e.GET("/swagger/*", echoSwagger.WrapHandler) + errCh := make(chan error, 1) + // サーバー起動 - e.Logger.Fatal(e.Start(":1234")) + go func() { + + if err := e.Start(":1234"); err != nil && err != http.ErrServerClosed { + errCh <- err + } + }() + + select { + case err := <-errCh: + return err + case <-ctx.Done(): + + } + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + return e.Shutdown(shutdownCtx) + } diff --git a/api/main.go b/api/main.go index 3a3b1a0e..999f2dbd 100755 --- a/api/main.go +++ b/api/main.go @@ -1,13 +1,30 @@ package main import ( + "context" + "log" + "os" + "os/signal" + "syscall" + "github.com/NUTFes/SeeFT/api/lib/di" _ "github.com/lib/pq" ) func main() { - client := di.InitializeServer() - defer client.CloseDB() + if err := run(); err != nil { + log.Fatal(err) + } +} + +func run() error { + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + client, err := di.InitializeServer(ctx) + if client != nil { + defer client.CloseDB() + } + return err } // func main() async {