339 lines
12 KiB
Go
339 lines
12 KiB
Go
package processor
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"apskel-pos-be/internal/client"
|
|
"apskel-pos-be/internal/entities"
|
|
"apskel-pos-be/internal/mappers"
|
|
"apskel-pos-be/internal/models"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// NotificationRepository is the interface the processor depends on.
|
|
type NotificationRepository interface {
|
|
Create(ctx context.Context, notification *entities.Notification) error
|
|
GetByID(ctx context.Context, id uuid.UUID) (*entities.Notification, error)
|
|
Update(ctx context.Context, notification *entities.Notification) error
|
|
Delete(ctx context.Context, id uuid.UUID) error
|
|
List(ctx context.Context, filters map[string]interface{}, limit, offset int) ([]*entities.Notification, int64, error)
|
|
}
|
|
|
|
// NotificationReceiverRepository is the interface the processor depends on.
|
|
type NotificationReceiverRepository interface {
|
|
Create(ctx context.Context, receiver *entities.NotificationReceiver) error
|
|
BulkCreate(ctx context.Context, receivers []*entities.NotificationReceiver) error
|
|
GetByID(ctx context.Context, id uuid.UUID) (*entities.NotificationReceiver, error)
|
|
GetByNotificationAndUser(ctx context.Context, notificationID, userID uuid.UUID) (*entities.NotificationReceiver, error)
|
|
Update(ctx context.Context, receiver *entities.NotificationReceiver) error
|
|
ListByUserID(ctx context.Context, userID uuid.UUID, isRead *bool, limit, offset int) ([]*entities.NotificationReceiver, int64, error)
|
|
CountUnreadByUserID(ctx context.Context, userID uuid.UUID) (int64, error)
|
|
SoftDeleteByID(ctx context.Context, id uuid.UUID) error
|
|
}
|
|
|
|
// NotificationDeliveryRepository is the interface the processor depends on.
|
|
type NotificationDeliveryRepository interface {
|
|
Create(ctx context.Context, delivery *entities.NotificationDelivery) error
|
|
BulkCreate(ctx context.Context, deliveries []*entities.NotificationDelivery) error
|
|
GetByID(ctx context.Context, id uuid.UUID) (*entities.NotificationDelivery, error)
|
|
Update(ctx context.Context, delivery *entities.NotificationDelivery) error
|
|
ListByReceiverID(ctx context.Context, receiverID uuid.UUID) ([]*entities.NotificationDelivery, error)
|
|
}
|
|
|
|
// NotificationUserRepository is a minimal interface to fetch user devices.
|
|
type NotificationUserDeviceRepository interface {
|
|
GetByUserID(ctx context.Context, userID uuid.UUID) ([]*entities.UserDevice, error)
|
|
}
|
|
|
|
// NotificationUserRepository is a minimal interface to fetch users by org.
|
|
type NotificationUserRepository interface {
|
|
GetActiveUsers(ctx context.Context, organizationID uuid.UUID) ([]*entities.User, error)
|
|
}
|
|
|
|
// NotificationProcessor defines the business logic interface.
|
|
type NotificationProcessor interface {
|
|
Send(ctx context.Context, req *models.SendNotificationRequest) (*models.NotificationResponse, error)
|
|
Broadcast(ctx context.Context, req *models.BroadcastNotificationRequest) (*models.NotificationResponse, error)
|
|
MarkAsRead(ctx context.Context, receiverID, userID uuid.UUID) (*models.NotificationReceiverResponse, error)
|
|
MarkAllAsRead(ctx context.Context, userID uuid.UUID) error
|
|
DeleteForUser(ctx context.Context, receiverID, userID uuid.UUID) error
|
|
ListForUser(ctx context.Context, req *models.ListNotificationsRequest) ([]*models.NotificationReceiverResponse, int64, int64, error)
|
|
GetByID(ctx context.Context, id uuid.UUID) (*models.NotificationResponse, error)
|
|
}
|
|
|
|
type NotificationProcessorImpl struct {
|
|
notificationRepo NotificationRepository
|
|
receiverRepo NotificationReceiverRepository
|
|
deliveryRepo NotificationDeliveryRepository
|
|
userDeviceRepo NotificationUserDeviceRepository
|
|
userRepo NotificationUserRepository
|
|
fcmClient client.FCMClient
|
|
}
|
|
|
|
func NewNotificationProcessor(
|
|
notificationRepo NotificationRepository,
|
|
receiverRepo NotificationReceiverRepository,
|
|
deliveryRepo NotificationDeliveryRepository,
|
|
userDeviceRepo NotificationUserDeviceRepository,
|
|
userRepo NotificationUserRepository,
|
|
fcmClient client.FCMClient,
|
|
) *NotificationProcessorImpl {
|
|
return &NotificationProcessorImpl{
|
|
notificationRepo: notificationRepo,
|
|
receiverRepo: receiverRepo,
|
|
deliveryRepo: deliveryRepo,
|
|
userDeviceRepo: userDeviceRepo,
|
|
userRepo: userRepo,
|
|
fcmClient: fcmClient,
|
|
}
|
|
}
|
|
|
|
// Send creates a notification and dispatches it to the given receiver user IDs via FCM.
|
|
func (p *NotificationProcessorImpl) Send(ctx context.Context, req *models.SendNotificationRequest) (*models.NotificationResponse, error) {
|
|
if len(req.ReceiverIDs) == 0 {
|
|
return nil, fmt.Errorf("at least one receiver_id is required")
|
|
}
|
|
|
|
notification := &entities.Notification{
|
|
Title: req.Title,
|
|
Body: req.Body,
|
|
Type: req.Type,
|
|
Category: req.Category,
|
|
Priority: req.Priority,
|
|
ImageURL: req.ImageURL,
|
|
ActionURL: req.ActionURL,
|
|
NotifiableType: req.NotifiableType,
|
|
NotifiableID: req.NotifiableID,
|
|
Data: req.Data,
|
|
ScheduledAt: req.ScheduledAt,
|
|
ExpiredAt: req.ExpiredAt,
|
|
CreatedBy: req.CreatedBy,
|
|
}
|
|
|
|
if err := p.notificationRepo.Create(ctx, notification); err != nil {
|
|
return nil, fmt.Errorf("failed to create notification: %w", err)
|
|
}
|
|
|
|
// Create receiver records and dispatch FCM per user.
|
|
for _, userID := range req.ReceiverIDs {
|
|
receiver := &entities.NotificationReceiver{
|
|
NotificationID: notification.ID,
|
|
UserID: userID,
|
|
}
|
|
if err := p.receiverRepo.Create(ctx, receiver); err != nil {
|
|
// Log but continue for other receivers.
|
|
continue
|
|
}
|
|
|
|
p.dispatchFCMToUser(ctx, receiver, notification)
|
|
}
|
|
|
|
// Mark notification as sent.
|
|
now := time.Now()
|
|
notification.SentAt = &now
|
|
_ = p.notificationRepo.Update(ctx, notification)
|
|
|
|
return mappers.NotificationEntityToResponse(notification), nil
|
|
}
|
|
|
|
// Broadcast sends a notification to all active users in an organization.
|
|
func (p *NotificationProcessorImpl) Broadcast(ctx context.Context, req *models.BroadcastNotificationRequest) (*models.NotificationResponse, error) {
|
|
users, err := p.userRepo.GetActiveUsers(ctx, req.OrganizationID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch organization users: %w", err)
|
|
}
|
|
|
|
notification := &entities.Notification{
|
|
Title: req.Title,
|
|
Body: req.Body,
|
|
Type: req.Type,
|
|
Category: req.Category,
|
|
Priority: req.Priority,
|
|
ImageURL: req.ImageURL,
|
|
ActionURL: req.ActionURL,
|
|
NotifiableType: req.NotifiableType,
|
|
NotifiableID: req.NotifiableID,
|
|
Data: req.Data,
|
|
ScheduledAt: req.ScheduledAt,
|
|
ExpiredAt: req.ExpiredAt,
|
|
CreatedBy: req.CreatedBy,
|
|
}
|
|
|
|
if err := p.notificationRepo.Create(ctx, notification); err != nil {
|
|
return nil, fmt.Errorf("failed to create notification: %w", err)
|
|
}
|
|
|
|
// Build receiver records in bulk.
|
|
receivers := make([]*entities.NotificationReceiver, 0, len(users))
|
|
for _, u := range users {
|
|
receivers = append(receivers, &entities.NotificationReceiver{
|
|
NotificationID: notification.ID,
|
|
UserID: u.ID,
|
|
})
|
|
}
|
|
|
|
if err := p.receiverRepo.BulkCreate(ctx, receivers); err != nil {
|
|
return nil, fmt.Errorf("failed to create notification receivers: %w", err)
|
|
}
|
|
|
|
// Dispatch FCM for each receiver.
|
|
for _, receiver := range receivers {
|
|
p.dispatchFCMToUser(ctx, receiver, notification)
|
|
}
|
|
|
|
now := time.Now()
|
|
notification.SentAt = &now
|
|
_ = p.notificationRepo.Update(ctx, notification)
|
|
|
|
return mappers.NotificationEntityToResponse(notification), nil
|
|
}
|
|
|
|
// MarkAsRead marks a single notification receiver record as read.
|
|
func (p *NotificationProcessorImpl) MarkAsRead(ctx context.Context, receiverID, userID uuid.UUID) (*models.NotificationReceiverResponse, error) {
|
|
receiver, err := p.receiverRepo.GetByID(ctx, receiverID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("notification not found: %w", err)
|
|
}
|
|
|
|
if receiver.UserID != userID {
|
|
return nil, fmt.Errorf("unauthorized: notification does not belong to user")
|
|
}
|
|
|
|
if !receiver.IsRead {
|
|
now := time.Now()
|
|
receiver.IsRead = true
|
|
receiver.ReadAt = &now
|
|
if err := p.receiverRepo.Update(ctx, receiver); err != nil {
|
|
return nil, fmt.Errorf("failed to mark notification as read: %w", err)
|
|
}
|
|
}
|
|
|
|
return mappers.NotificationReceiverEntityToResponse(receiver), nil
|
|
}
|
|
|
|
// MarkAllAsRead marks all unread notifications for a user as read.
|
|
func (p *NotificationProcessorImpl) MarkAllAsRead(ctx context.Context, userID uuid.UUID) error {
|
|
isRead := false
|
|
receivers, _, err := p.receiverRepo.ListByUserID(ctx, userID, &isRead, 1000, 0)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch unread notifications: %w", err)
|
|
}
|
|
|
|
now := time.Now()
|
|
for _, r := range receivers {
|
|
r.IsRead = true
|
|
r.ReadAt = &now
|
|
_ = p.receiverRepo.Update(ctx, r)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteForUser soft-deletes a notification receiver record for a user.
|
|
func (p *NotificationProcessorImpl) DeleteForUser(ctx context.Context, receiverID, userID uuid.UUID) error {
|
|
receiver, err := p.receiverRepo.GetByID(ctx, receiverID)
|
|
if err != nil {
|
|
return fmt.Errorf("notification not found: %w", err)
|
|
}
|
|
|
|
if receiver.UserID != userID {
|
|
return fmt.Errorf("unauthorized: notification does not belong to user")
|
|
}
|
|
|
|
return p.receiverRepo.SoftDeleteByID(ctx, receiverID)
|
|
}
|
|
|
|
// ListForUser returns paginated notifications for a user.
|
|
// Returns: receivers, total, unreadCount, error
|
|
func (p *NotificationProcessorImpl) ListForUser(ctx context.Context, req *models.ListNotificationsRequest) ([]*models.NotificationReceiverResponse, int64, int64, error) {
|
|
offset := (req.Page - 1) * req.Limit
|
|
|
|
receivers, total, err := p.receiverRepo.ListByUserID(ctx, req.UserID, req.IsRead, req.Limit, offset)
|
|
if err != nil {
|
|
return nil, 0, 0, fmt.Errorf("failed to list notifications: %w", err)
|
|
}
|
|
|
|
unreadCount, err := p.receiverRepo.CountUnreadByUserID(ctx, req.UserID)
|
|
if err != nil {
|
|
unreadCount = 0
|
|
}
|
|
|
|
responses := mappers.NotificationReceiverEntitiesToResponses(receivers)
|
|
return responses, total, unreadCount, nil
|
|
}
|
|
|
|
// GetByID returns a single notification by its ID.
|
|
func (p *NotificationProcessorImpl) GetByID(ctx context.Context, id uuid.UUID) (*models.NotificationResponse, error) {
|
|
notification, err := p.notificationRepo.GetByID(ctx, id)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("notification not found: %w", err)
|
|
}
|
|
return mappers.NotificationEntityToResponse(notification), nil
|
|
}
|
|
|
|
// dispatchFCMToUser fetches all FCM tokens for a user and sends the push notification.
|
|
func (p *NotificationProcessorImpl) dispatchFCMToUser(ctx context.Context, receiver *entities.NotificationReceiver, notification *entities.Notification) {
|
|
if p.fcmClient == nil {
|
|
return
|
|
}
|
|
|
|
devices, err := p.userDeviceRepo.GetByUserID(ctx, receiver.UserID)
|
|
if err != nil || len(devices) == 0 {
|
|
return
|
|
}
|
|
|
|
// Build FCM data payload.
|
|
data := map[string]string{
|
|
"notification_id": notification.ID.String(),
|
|
"notification_receiver_id": receiver.ID.String(),
|
|
"type": notification.Type,
|
|
"category": notification.Category,
|
|
"action_url": notification.ActionURL,
|
|
}
|
|
|
|
// Collect valid FCM tokens and create delivery records.
|
|
tokens := make([]string, 0, len(devices))
|
|
deliveries := make([]*entities.NotificationDelivery, 0, len(devices))
|
|
|
|
for _, device := range devices {
|
|
if device.FCMToken == "" {
|
|
continue
|
|
}
|
|
tokens = append(tokens, device.FCMToken)
|
|
deliveries = append(deliveries, &entities.NotificationDelivery{
|
|
NotificationReceiverID: receiver.ID,
|
|
UserDeviceID: device.ID,
|
|
Channel: entities.NotificationChannelPush,
|
|
DeliveryStatus: entities.NotificationDeliveryStatusPending,
|
|
Provider: entities.NotificationProviderFirebase,
|
|
})
|
|
}
|
|
|
|
if len(tokens) == 0 {
|
|
return
|
|
}
|
|
|
|
// Persist delivery records before sending.
|
|
_ = p.deliveryRepo.BulkCreate(ctx, deliveries)
|
|
|
|
// Send via FCM multicast.
|
|
now := time.Now()
|
|
sendErr := p.fcmClient.SendMulticastNotification(ctx, tokens, notification.Title, notification.Body, data)
|
|
|
|
// Update delivery status.
|
|
for _, delivery := range deliveries {
|
|
if sendErr != nil {
|
|
delivery.DeliveryStatus = entities.NotificationDeliveryStatusFailed
|
|
delivery.FailedAt = &now
|
|
delivery.FailureReason = sendErr.Error()
|
|
} else {
|
|
delivery.DeliveryStatus = entities.NotificationDeliveryStatusSent
|
|
delivery.SentAt = &now
|
|
}
|
|
_ = p.deliveryRepo.Update(ctx, delivery)
|
|
}
|
|
}
|