package processor import ( "context" "fmt" "time" "apskel-pos-be/internal/client" "apskel-pos-be/internal/entities" "apskel-pos-be/internal/mappers" "apskel-pos-be/internal/models" "github.com/google/uuid" ) // NotificationRepository is the interface the processor depends on. type NotificationRepository interface { Create(ctx context.Context, notification *entities.Notification) error GetByID(ctx context.Context, id uuid.UUID) (*entities.Notification, error) Update(ctx context.Context, notification *entities.Notification) error Delete(ctx context.Context, id uuid.UUID) error List(ctx context.Context, filters map[string]interface{}, limit, offset int) ([]*entities.Notification, int64, error) } // NotificationReceiverRepository is the interface the processor depends on. type NotificationReceiverRepository interface { Create(ctx context.Context, receiver *entities.NotificationReceiver) error BulkCreate(ctx context.Context, receivers []*entities.NotificationReceiver) error GetByID(ctx context.Context, id uuid.UUID) (*entities.NotificationReceiver, error) GetByNotificationAndUser(ctx context.Context, notificationID, userID uuid.UUID) (*entities.NotificationReceiver, error) Update(ctx context.Context, receiver *entities.NotificationReceiver) error ListByUserID(ctx context.Context, userID uuid.UUID, isRead *bool, limit, offset int) ([]*entities.NotificationReceiver, int64, error) CountUnreadByUserID(ctx context.Context, userID uuid.UUID) (int64, error) SoftDeleteByID(ctx context.Context, id uuid.UUID) error } // NotificationDeliveryRepository is the interface the processor depends on. type NotificationDeliveryRepository interface { Create(ctx context.Context, delivery *entities.NotificationDelivery) error BulkCreate(ctx context.Context, deliveries []*entities.NotificationDelivery) error GetByID(ctx context.Context, id uuid.UUID) (*entities.NotificationDelivery, error) Update(ctx context.Context, delivery *entities.NotificationDelivery) error ListByReceiverID(ctx context.Context, receiverID uuid.UUID) ([]*entities.NotificationDelivery, error) } // NotificationUserRepository is a minimal interface to fetch user devices. type NotificationUserDeviceRepository interface { GetByUserID(ctx context.Context, userID uuid.UUID) ([]*entities.UserDevice, error) } // NotificationUserRepository is a minimal interface to fetch users by org. type NotificationUserRepository interface { GetActiveUsers(ctx context.Context, organizationID uuid.UUID) ([]*entities.User, error) } // NotificationProcessor defines the business logic interface. type NotificationProcessor interface { Send(ctx context.Context, req *models.SendNotificationRequest) (*models.NotificationResponse, error) Broadcast(ctx context.Context, req *models.BroadcastNotificationRequest) (*models.NotificationResponse, error) MarkAsRead(ctx context.Context, receiverID, userID uuid.UUID) (*models.NotificationReceiverResponse, error) MarkAllAsRead(ctx context.Context, userID uuid.UUID) error DeleteForUser(ctx context.Context, receiverID, userID uuid.UUID) error ListForUser(ctx context.Context, req *models.ListNotificationsRequest) ([]*models.NotificationReceiverResponse, int64, int64, error) GetByID(ctx context.Context, id uuid.UUID) (*models.NotificationResponse, error) } type NotificationProcessorImpl struct { notificationRepo NotificationRepository receiverRepo NotificationReceiverRepository deliveryRepo NotificationDeliveryRepository userDeviceRepo NotificationUserDeviceRepository userRepo NotificationUserRepository fcmClient client.FCMClient } func NewNotificationProcessor( notificationRepo NotificationRepository, receiverRepo NotificationReceiverRepository, deliveryRepo NotificationDeliveryRepository, userDeviceRepo NotificationUserDeviceRepository, userRepo NotificationUserRepository, fcmClient client.FCMClient, ) *NotificationProcessorImpl { return &NotificationProcessorImpl{ notificationRepo: notificationRepo, receiverRepo: receiverRepo, deliveryRepo: deliveryRepo, userDeviceRepo: userDeviceRepo, userRepo: userRepo, fcmClient: fcmClient, } } // Send creates a notification and dispatches it to the given receiver user IDs via FCM. func (p *NotificationProcessorImpl) Send(ctx context.Context, req *models.SendNotificationRequest) (*models.NotificationResponse, error) { if len(req.ReceiverIDs) == 0 { return nil, fmt.Errorf("at least one receiver_id is required") } notification := &entities.Notification{ Title: req.Title, Body: req.Body, Type: req.Type, Category: req.Category, Priority: req.Priority, ImageURL: req.ImageURL, ActionURL: req.ActionURL, NotifiableType: req.NotifiableType, NotifiableID: req.NotifiableID, Data: req.Data, ScheduledAt: req.ScheduledAt, ExpiredAt: req.ExpiredAt, CreatedBy: req.CreatedBy, } if err := p.notificationRepo.Create(ctx, notification); err != nil { return nil, fmt.Errorf("failed to create notification: %w", err) } // Create receiver records and dispatch FCM per user. for _, userID := range req.ReceiverIDs { receiver := &entities.NotificationReceiver{ NotificationID: notification.ID, UserID: userID, } if err := p.receiverRepo.Create(ctx, receiver); err != nil { // Log but continue for other receivers. continue } p.dispatchFCMToUser(ctx, receiver, notification) } // Mark notification as sent. now := time.Now() notification.SentAt = &now _ = p.notificationRepo.Update(ctx, notification) return mappers.NotificationEntityToResponse(notification), nil } // Broadcast sends a notification to all active users in an organization. func (p *NotificationProcessorImpl) Broadcast(ctx context.Context, req *models.BroadcastNotificationRequest) (*models.NotificationResponse, error) { users, err := p.userRepo.GetActiveUsers(ctx, req.OrganizationID) if err != nil { return nil, fmt.Errorf("failed to fetch organization users: %w", err) } notification := &entities.Notification{ Title: req.Title, Body: req.Body, Type: req.Type, Category: req.Category, Priority: req.Priority, ImageURL: req.ImageURL, ActionURL: req.ActionURL, NotifiableType: req.NotifiableType, NotifiableID: req.NotifiableID, Data: req.Data, ScheduledAt: req.ScheduledAt, ExpiredAt: req.ExpiredAt, CreatedBy: req.CreatedBy, } if err := p.notificationRepo.Create(ctx, notification); err != nil { return nil, fmt.Errorf("failed to create notification: %w", err) } // Build receiver records in bulk. receivers := make([]*entities.NotificationReceiver, 0, len(users)) for _, u := range users { receivers = append(receivers, &entities.NotificationReceiver{ NotificationID: notification.ID, UserID: u.ID, }) } if err := p.receiverRepo.BulkCreate(ctx, receivers); err != nil { return nil, fmt.Errorf("failed to create notification receivers: %w", err) } // Dispatch FCM for each receiver. for _, receiver := range receivers { p.dispatchFCMToUser(ctx, receiver, notification) } now := time.Now() notification.SentAt = &now _ = p.notificationRepo.Update(ctx, notification) return mappers.NotificationEntityToResponse(notification), nil } // MarkAsRead marks a single notification receiver record as read. func (p *NotificationProcessorImpl) MarkAsRead(ctx context.Context, receiverID, userID uuid.UUID) (*models.NotificationReceiverResponse, error) { receiver, err := p.receiverRepo.GetByID(ctx, receiverID) if err != nil { return nil, fmt.Errorf("notification not found: %w", err) } if receiver.UserID != userID { return nil, fmt.Errorf("unauthorized: notification does not belong to user") } if !receiver.IsRead { now := time.Now() receiver.IsRead = true receiver.ReadAt = &now if err := p.receiverRepo.Update(ctx, receiver); err != nil { return nil, fmt.Errorf("failed to mark notification as read: %w", err) } } return mappers.NotificationReceiverEntityToResponse(receiver), nil } // MarkAllAsRead marks all unread notifications for a user as read. func (p *NotificationProcessorImpl) MarkAllAsRead(ctx context.Context, userID uuid.UUID) error { isRead := false receivers, _, err := p.receiverRepo.ListByUserID(ctx, userID, &isRead, 1000, 0) if err != nil { return fmt.Errorf("failed to fetch unread notifications: %w", err) } now := time.Now() for _, r := range receivers { r.IsRead = true r.ReadAt = &now _ = p.receiverRepo.Update(ctx, r) } return nil } // DeleteForUser soft-deletes a notification receiver record for a user. func (p *NotificationProcessorImpl) DeleteForUser(ctx context.Context, receiverID, userID uuid.UUID) error { receiver, err := p.receiverRepo.GetByID(ctx, receiverID) if err != nil { return fmt.Errorf("notification not found: %w", err) } if receiver.UserID != userID { return fmt.Errorf("unauthorized: notification does not belong to user") } return p.receiverRepo.SoftDeleteByID(ctx, receiverID) } // ListForUser returns paginated notifications for a user. // Returns: receivers, total, unreadCount, error func (p *NotificationProcessorImpl) ListForUser(ctx context.Context, req *models.ListNotificationsRequest) ([]*models.NotificationReceiverResponse, int64, int64, error) { offset := (req.Page - 1) * req.Limit receivers, total, err := p.receiverRepo.ListByUserID(ctx, req.UserID, req.IsRead, req.Limit, offset) if err != nil { return nil, 0, 0, fmt.Errorf("failed to list notifications: %w", err) } unreadCount, err := p.receiverRepo.CountUnreadByUserID(ctx, req.UserID) if err != nil { unreadCount = 0 } responses := mappers.NotificationReceiverEntitiesToResponses(receivers) return responses, total, unreadCount, nil } // GetByID returns a single notification by its ID. func (p *NotificationProcessorImpl) GetByID(ctx context.Context, id uuid.UUID) (*models.NotificationResponse, error) { notification, err := p.notificationRepo.GetByID(ctx, id) if err != nil { return nil, fmt.Errorf("notification not found: %w", err) } return mappers.NotificationEntityToResponse(notification), nil } // dispatchFCMToUser fetches all FCM tokens for a user and sends the push notification. func (p *NotificationProcessorImpl) dispatchFCMToUser(ctx context.Context, receiver *entities.NotificationReceiver, notification *entities.Notification) { if p.fcmClient == nil { return } devices, err := p.userDeviceRepo.GetByUserID(ctx, receiver.UserID) if err != nil || len(devices) == 0 { return } // Build FCM data payload. data := map[string]string{ "notification_id": notification.ID.String(), "notification_receiver_id": receiver.ID.String(), "type": notification.Type, "category": notification.Category, "action_url": notification.ActionURL, } // Collect valid FCM tokens and create delivery records. tokens := make([]string, 0, len(devices)) deliveries := make([]*entities.NotificationDelivery, 0, len(devices)) for _, device := range devices { if device.FCMToken == "" { continue } tokens = append(tokens, device.FCMToken) deliveries = append(deliveries, &entities.NotificationDelivery{ NotificationReceiverID: receiver.ID, UserDeviceID: device.ID, Channel: entities.NotificationChannelPush, DeliveryStatus: entities.NotificationDeliveryStatusPending, Provider: entities.NotificationProviderFirebase, }) } if len(tokens) == 0 { return } // Persist delivery records before sending. _ = p.deliveryRepo.BulkCreate(ctx, deliveries) // Send via FCM multicast. now := time.Now() sendErr := p.fcmClient.SendMulticastNotification(ctx, tokens, notification.Title, notification.Body, data) // Update delivery status. for _, delivery := range deliveries { if sendErr != nil { delivery.DeliveryStatus = entities.NotificationDeliveryStatusFailed delivery.FailedAt = &now delivery.FailureReason = sendErr.Error() } else { delivery.DeliveryStatus = entities.NotificationDeliveryStatusSent delivery.SentAt = &now } _ = p.deliveryRepo.Update(ctx, delivery) } }