|
|
@ -12,12 +12,11 @@ export const MessageQueueProvider = ({ children }) => { |
|
|
|
const [queueChats, setQueueChats] = useState({}); // Stores chats and status for display
|
|
|
|
const [queueChats, setQueueChats] = useState({}); // Stores chats and status for display
|
|
|
|
const isProcessingRef = useRef(false); // To track if the queue is being processed
|
|
|
|
const isProcessingRef = useRef(false); // To track if the queue is being processed
|
|
|
|
const maxRetries = 4; |
|
|
|
const maxRetries = 4; |
|
|
|
|
|
|
|
|
|
|
|
const clearStatesMessageQueueProvider = useCallback(() => { |
|
|
|
const clearStatesMessageQueueProvider = useCallback(() => { |
|
|
|
setQueueChats({}) |
|
|
|
setQueueChats({}); |
|
|
|
messageQueue = [] |
|
|
|
messageQueue = []; |
|
|
|
isProcessingRef.current = false |
|
|
|
isProcessingRef.current = false; |
|
|
|
}, []) |
|
|
|
}, []); |
|
|
|
|
|
|
|
|
|
|
|
// Function to add a message to the queue
|
|
|
|
// Function to add a message to the queue
|
|
|
|
const addToQueue = useCallback((sendMessageFunc, messageObj, type, groupDirectId) => { |
|
|
|
const addToQueue = useCallback((sendMessageFunc, messageObj, type, groupDirectId) => { |
|
|
@ -40,16 +39,43 @@ export const MessageQueueProvider = ({ children }) => { |
|
|
|
// Add the message to the global messageQueue
|
|
|
|
// Add the message to the global messageQueue
|
|
|
|
messageQueue = [ |
|
|
|
messageQueue = [ |
|
|
|
...messageQueue, |
|
|
|
...messageQueue, |
|
|
|
{ func: sendMessageFunc, identifier: tempId, groupDirectId } |
|
|
|
{ func: sendMessageFunc, identifier: tempId, groupDirectId, specialId: messageObj?.message?.specialId } |
|
|
|
]; |
|
|
|
]; |
|
|
|
|
|
|
|
|
|
|
|
// Start processing the queue if not already processing
|
|
|
|
// Start processing the queue if not already processing
|
|
|
|
processQueue(); |
|
|
|
processQueue(); |
|
|
|
}, []); |
|
|
|
}, []); |
|
|
|
|
|
|
|
|
|
|
|
// Function to process the messageQueue
|
|
|
|
// Method to process with new messages and groupDirectId
|
|
|
|
// Function to process the messageQueue
|
|
|
|
const processWithNewMessages = (newMessages, groupDirectId) => { |
|
|
|
const processQueue = useCallback(async () => { |
|
|
|
processQueue(newMessages, groupDirectId); |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Function to process the messageQueue and handle new messages
|
|
|
|
|
|
|
|
const processQueue = useCallback(async (newMessages = [], groupDirectId) => { |
|
|
|
|
|
|
|
// Filter out any message in the queue that matches the specialId from newMessages
|
|
|
|
|
|
|
|
messageQueue = messageQueue.filter((msg) => { |
|
|
|
|
|
|
|
return !newMessages.some(newMsg => newMsg?.specialId === msg?.specialId); |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Remove any corresponding entries in queueChats for the provided groupDirectId
|
|
|
|
|
|
|
|
setQueueChats((prev) => { |
|
|
|
|
|
|
|
const updatedChats = { ...prev }; |
|
|
|
|
|
|
|
if (updatedChats[groupDirectId]) { |
|
|
|
|
|
|
|
// Remove any message in queueChats that has a matching specialId
|
|
|
|
|
|
|
|
updatedChats[groupDirectId] = updatedChats[groupDirectId].filter((chat) => { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return !newMessages.some(newMsg => newMsg?.specialId === chat?.message?.specialId); |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// If no more chats for this group, delete the groupDirectId entry
|
|
|
|
|
|
|
|
if (updatedChats[groupDirectId].length === 0) { |
|
|
|
|
|
|
|
delete updatedChats[groupDirectId]; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return updatedChats; |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
// If currently processing or the queue is empty, return
|
|
|
|
// If currently processing or the queue is empty, return
|
|
|
|
if (isProcessingRef.current || messageQueue.length === 0) return; |
|
|
|
if (isProcessingRef.current || messageQueue.length === 0) return; |
|
|
|
|
|
|
|
|
|
|
@ -77,18 +103,17 @@ const processQueue = useCallback(async () => { |
|
|
|
// Execute the function stored in the messageQueue
|
|
|
|
// Execute the function stored in the messageQueue
|
|
|
|
await currentMessage.func(); |
|
|
|
await currentMessage.func(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Remove the message from the messageQueue after successful sending
|
|
|
|
// Remove the message from the messageQueue after successful sending
|
|
|
|
messageQueue = messageQueue.slice(1); |
|
|
|
messageQueue = messageQueue.slice(1); // Slice here remains for successful messages
|
|
|
|
|
|
|
|
|
|
|
|
// Remove the message from queueChats after success
|
|
|
|
// Remove the message from queueChats after success
|
|
|
|
setQueueChats((prev) => { |
|
|
|
// setQueueChats((prev) => {
|
|
|
|
const updatedChats = { ...prev }; |
|
|
|
// const updatedChats = { ...prev };
|
|
|
|
updatedChats[groupDirectId] = updatedChats[groupDirectId].filter( |
|
|
|
// updatedChats[groupDirectId] = updatedChats[groupDirectId].filter(
|
|
|
|
(item) => item.identifier !== identifier |
|
|
|
// (item) => item.identifier !== identifier
|
|
|
|
); |
|
|
|
// );
|
|
|
|
return updatedChats; |
|
|
|
// return updatedChats;
|
|
|
|
}); |
|
|
|
// });
|
|
|
|
} catch (error) { |
|
|
|
} catch (error) { |
|
|
|
console.error('Message sending failed', error); |
|
|
|
console.error('Message sending failed', error); |
|
|
|
|
|
|
|
|
|
|
@ -109,7 +134,7 @@ const processQueue = useCallback(async () => { |
|
|
|
updatedChats[groupDirectId][chatIndex].status = 'failed-permanent'; |
|
|
|
updatedChats[groupDirectId][chatIndex].status = 'failed-permanent'; |
|
|
|
|
|
|
|
|
|
|
|
// Remove the message from the messageQueue after max retries
|
|
|
|
// Remove the message from the messageQueue after max retries
|
|
|
|
messageQueue = messageQueue.slice(1); |
|
|
|
messageQueue = messageQueue.slice(1); // Slice for failed messages after max retries
|
|
|
|
|
|
|
|
|
|
|
|
// Remove the message from queueChats after failure
|
|
|
|
// Remove the message from queueChats after failure
|
|
|
|
updatedChats[groupDirectId] = updatedChats[groupDirectId].filter( |
|
|
|
updatedChats[groupDirectId] = updatedChats[groupDirectId].filter( |
|
|
@ -129,9 +154,8 @@ const processQueue = useCallback(async () => { |
|
|
|
isProcessingRef.current = false; |
|
|
|
isProcessingRef.current = false; |
|
|
|
}, []); |
|
|
|
}, []); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return ( |
|
|
|
return ( |
|
|
|
<MessageQueueContext.Provider value={{ addToQueue, queueChats, clearStatesMessageQueueProvider }}> |
|
|
|
<MessageQueueContext.Provider value={{ addToQueue, queueChats, clearStatesMessageQueueProvider, processWithNewMessages }}> |
|
|
|
{children} |
|
|
|
{children} |
|
|
|
</MessageQueueContext.Provider> |
|
|
|
</MessageQueueContext.Provider> |
|
|
|
); |
|
|
|
); |
|
|
|