apskel-pos-backend/internal/processor/notification_processor.go
2026-05-10 10:57:38 +07:00

339 lines
12 KiB
Go

package processor
import (
"context"
"fmt"
"time"
"apskel-pos-be/internal/client"
"apskel-pos-be/internal/entities"
"apskel-pos-be/internal/mappers"
"apskel-pos-be/internal/models"
"github.com/google/uuid"
)
// NotificationRepository is the interface the processor depends on.
type NotificationRepository interface {
Create(ctx context.Context, notification *entities.Notification) error
GetByID(ctx context.Context, id uuid.UUID) (*entities.Notification, error)
Update(ctx context.Context, notification *entities.Notification) error
Delete(ctx context.Context, id uuid.UUID) error
List(ctx context.Context, filters map[string]interface{}, limit, offset int) ([]*entities.Notification, int64, error)
}
// NotificationReceiverRepository is the interface the processor depends on.
type NotificationReceiverRepository interface {
Create(ctx context.Context, receiver *entities.NotificationReceiver) error
BulkCreate(ctx context.Context, receivers []*entities.NotificationReceiver) error
GetByID(ctx context.Context, id uuid.UUID) (*entities.NotificationReceiver, error)
GetByNotificationAndUser(ctx context.Context, notificationID, userID uuid.UUID) (*entities.NotificationReceiver, error)
Update(ctx context.Context, receiver *entities.NotificationReceiver) error
ListByUserID(ctx context.Context, userID uuid.UUID, isRead *bool, limit, offset int) ([]*entities.NotificationReceiver, int64, error)
CountUnreadByUserID(ctx context.Context, userID uuid.UUID) (int64, error)
SoftDeleteByID(ctx context.Context, id uuid.UUID) error
}
// NotificationDeliveryRepository is the interface the processor depends on.
type NotificationDeliveryRepository interface {
Create(ctx context.Context, delivery *entities.NotificationDelivery) error
BulkCreate(ctx context.Context, deliveries []*entities.NotificationDelivery) error
GetByID(ctx context.Context, id uuid.UUID) (*entities.NotificationDelivery, error)
Update(ctx context.Context, delivery *entities.NotificationDelivery) error
ListByReceiverID(ctx context.Context, receiverID uuid.UUID) ([]*entities.NotificationDelivery, error)
}
// NotificationUserRepository is a minimal interface to fetch user devices.
type NotificationUserDeviceRepository interface {
GetByUserID(ctx context.Context, userID uuid.UUID) ([]*entities.UserDevice, error)
}
// NotificationUserRepository is a minimal interface to fetch users by org.
type NotificationUserRepository interface {
GetActiveUsers(ctx context.Context, organizationID uuid.UUID) ([]*entities.User, error)
}
// NotificationProcessor defines the business logic interface.
type NotificationProcessor interface {
Send(ctx context.Context, req *models.SendNotificationRequest) (*models.NotificationResponse, error)
Broadcast(ctx context.Context, req *models.BroadcastNotificationRequest) (*models.NotificationResponse, error)
MarkAsRead(ctx context.Context, receiverID, userID uuid.UUID) (*models.NotificationReceiverResponse, error)
MarkAllAsRead(ctx context.Context, userID uuid.UUID) error
DeleteForUser(ctx context.Context, receiverID, userID uuid.UUID) error
ListForUser(ctx context.Context, req *models.ListNotificationsRequest) ([]*models.NotificationReceiverResponse, int64, int64, error)
GetByID(ctx context.Context, id uuid.UUID) (*models.NotificationResponse, error)
}
type NotificationProcessorImpl struct {
notificationRepo NotificationRepository
receiverRepo NotificationReceiverRepository
deliveryRepo NotificationDeliveryRepository
userDeviceRepo NotificationUserDeviceRepository
userRepo NotificationUserRepository
fcmClient client.FCMClient
}
func NewNotificationProcessor(
notificationRepo NotificationRepository,
receiverRepo NotificationReceiverRepository,
deliveryRepo NotificationDeliveryRepository,
userDeviceRepo NotificationUserDeviceRepository,
userRepo NotificationUserRepository,
fcmClient client.FCMClient,
) *NotificationProcessorImpl {
return &NotificationProcessorImpl{
notificationRepo: notificationRepo,
receiverRepo: receiverRepo,
deliveryRepo: deliveryRepo,
userDeviceRepo: userDeviceRepo,
userRepo: userRepo,
fcmClient: fcmClient,
}
}
// Send creates a notification and dispatches it to the given receiver user IDs via FCM.
func (p *NotificationProcessorImpl) Send(ctx context.Context, req *models.SendNotificationRequest) (*models.NotificationResponse, error) {
if len(req.ReceiverIDs) == 0 {
return nil, fmt.Errorf("at least one receiver_id is required")
}
notification := &entities.Notification{
Title: req.Title,
Body: req.Body,
Type: req.Type,
Category: req.Category,
Priority: req.Priority,
ImageURL: req.ImageURL,
ActionURL: req.ActionURL,
NotifiableType: req.NotifiableType,
NotifiableID: req.NotifiableID,
Data: req.Data,
ScheduledAt: req.ScheduledAt,
ExpiredAt: req.ExpiredAt,
CreatedBy: req.CreatedBy,
}
if err := p.notificationRepo.Create(ctx, notification); err != nil {
return nil, fmt.Errorf("failed to create notification: %w", err)
}
// Create receiver records and dispatch FCM per user.
for _, userID := range req.ReceiverIDs {
receiver := &entities.NotificationReceiver{
NotificationID: notification.ID,
UserID: userID,
}
if err := p.receiverRepo.Create(ctx, receiver); err != nil {
// Log but continue for other receivers.
continue
}
p.dispatchFCMToUser(ctx, receiver, notification)
}
// Mark notification as sent.
now := time.Now()
notification.SentAt = &now
_ = p.notificationRepo.Update(ctx, notification)
return mappers.NotificationEntityToResponse(notification), nil
}
// Broadcast sends a notification to all active users in an organization.
func (p *NotificationProcessorImpl) Broadcast(ctx context.Context, req *models.BroadcastNotificationRequest) (*models.NotificationResponse, error) {
users, err := p.userRepo.GetActiveUsers(ctx, req.OrganizationID)
if err != nil {
return nil, fmt.Errorf("failed to fetch organization users: %w", err)
}
notification := &entities.Notification{
Title: req.Title,
Body: req.Body,
Type: req.Type,
Category: req.Category,
Priority: req.Priority,
ImageURL: req.ImageURL,
ActionURL: req.ActionURL,
NotifiableType: req.NotifiableType,
NotifiableID: req.NotifiableID,
Data: req.Data,
ScheduledAt: req.ScheduledAt,
ExpiredAt: req.ExpiredAt,
CreatedBy: req.CreatedBy,
}
if err := p.notificationRepo.Create(ctx, notification); err != nil {
return nil, fmt.Errorf("failed to create notification: %w", err)
}
// Build receiver records in bulk.
receivers := make([]*entities.NotificationReceiver, 0, len(users))
for _, u := range users {
receivers = append(receivers, &entities.NotificationReceiver{
NotificationID: notification.ID,
UserID: u.ID,
})
}
if err := p.receiverRepo.BulkCreate(ctx, receivers); err != nil {
return nil, fmt.Errorf("failed to create notification receivers: %w", err)
}
// Dispatch FCM for each receiver.
for _, receiver := range receivers {
p.dispatchFCMToUser(ctx, receiver, notification)
}
now := time.Now()
notification.SentAt = &now
_ = p.notificationRepo.Update(ctx, notification)
return mappers.NotificationEntityToResponse(notification), nil
}
// MarkAsRead marks a single notification receiver record as read.
func (p *NotificationProcessorImpl) MarkAsRead(ctx context.Context, receiverID, userID uuid.UUID) (*models.NotificationReceiverResponse, error) {
receiver, err := p.receiverRepo.GetByID(ctx, receiverID)
if err != nil {
return nil, fmt.Errorf("notification not found: %w", err)
}
if receiver.UserID != userID {
return nil, fmt.Errorf("unauthorized: notification does not belong to user")
}
if !receiver.IsRead {
now := time.Now()
receiver.IsRead = true
receiver.ReadAt = &now
if err := p.receiverRepo.Update(ctx, receiver); err != nil {
return nil, fmt.Errorf("failed to mark notification as read: %w", err)
}
}
return mappers.NotificationReceiverEntityToResponse(receiver), nil
}
// MarkAllAsRead marks all unread notifications for a user as read.
func (p *NotificationProcessorImpl) MarkAllAsRead(ctx context.Context, userID uuid.UUID) error {
isRead := false
receivers, _, err := p.receiverRepo.ListByUserID(ctx, userID, &isRead, 1000, 0)
if err != nil {
return fmt.Errorf("failed to fetch unread notifications: %w", err)
}
now := time.Now()
for _, r := range receivers {
r.IsRead = true
r.ReadAt = &now
_ = p.receiverRepo.Update(ctx, r)
}
return nil
}
// DeleteForUser soft-deletes a notification receiver record for a user.
func (p *NotificationProcessorImpl) DeleteForUser(ctx context.Context, receiverID, userID uuid.UUID) error {
receiver, err := p.receiverRepo.GetByID(ctx, receiverID)
if err != nil {
return fmt.Errorf("notification not found: %w", err)
}
if receiver.UserID != userID {
return fmt.Errorf("unauthorized: notification does not belong to user")
}
return p.receiverRepo.SoftDeleteByID(ctx, receiverID)
}
// ListForUser returns paginated notifications for a user.
// Returns: receivers, total, unreadCount, error
func (p *NotificationProcessorImpl) ListForUser(ctx context.Context, req *models.ListNotificationsRequest) ([]*models.NotificationReceiverResponse, int64, int64, error) {
offset := (req.Page - 1) * req.Limit
receivers, total, err := p.receiverRepo.ListByUserID(ctx, req.UserID, req.IsRead, req.Limit, offset)
if err != nil {
return nil, 0, 0, fmt.Errorf("failed to list notifications: %w", err)
}
unreadCount, err := p.receiverRepo.CountUnreadByUserID(ctx, req.UserID)
if err != nil {
unreadCount = 0
}
responses := mappers.NotificationReceiverEntitiesToResponses(receivers)
return responses, total, unreadCount, nil
}
// GetByID returns a single notification by its ID.
func (p *NotificationProcessorImpl) GetByID(ctx context.Context, id uuid.UUID) (*models.NotificationResponse, error) {
notification, err := p.notificationRepo.GetByID(ctx, id)
if err != nil {
return nil, fmt.Errorf("notification not found: %w", err)
}
return mappers.NotificationEntityToResponse(notification), nil
}
// dispatchFCMToUser fetches all FCM tokens for a user and sends the push notification.
func (p *NotificationProcessorImpl) dispatchFCMToUser(ctx context.Context, receiver *entities.NotificationReceiver, notification *entities.Notification) {
if p.fcmClient == nil {
return
}
devices, err := p.userDeviceRepo.GetByUserID(ctx, receiver.UserID)
if err != nil || len(devices) == 0 {
return
}
// Build FCM data payload.
data := map[string]string{
"notification_id": notification.ID.String(),
"notification_receiver_id": receiver.ID.String(),
"type": notification.Type,
"category": notification.Category,
"action_url": notification.ActionURL,
}
// Collect valid FCM tokens and create delivery records.
tokens := make([]string, 0, len(devices))
deliveries := make([]*entities.NotificationDelivery, 0, len(devices))
for _, device := range devices {
if device.FCMToken == "" {
continue
}
tokens = append(tokens, device.FCMToken)
deliveries = append(deliveries, &entities.NotificationDelivery{
NotificationReceiverID: receiver.ID,
UserDeviceID: device.ID,
Channel: entities.NotificationChannelPush,
DeliveryStatus: entities.NotificationDeliveryStatusPending,
Provider: entities.NotificationProviderFirebase,
})
}
if len(tokens) == 0 {
return
}
// Persist delivery records before sending.
_ = p.deliveryRepo.BulkCreate(ctx, deliveries)
// Send via FCM multicast.
now := time.Now()
sendErr := p.fcmClient.SendMulticastNotification(ctx, tokens, notification.Title, notification.Body, data)
// Update delivery status.
for _, delivery := range deliveries {
if sendErr != nil {
delivery.DeliveryStatus = entities.NotificationDeliveryStatusFailed
delivery.FailedAt = &now
delivery.FailureReason = sendErr.Error()
} else {
delivery.DeliveryStatus = entities.NotificationDeliveryStatusSent
delivery.SentAt = &now
}
_ = p.deliveryRepo.Update(ctx, delivery)
}
}