mirror of
https://github.com/Qortal/Qortal-Hub.git
synced 2025-06-07 17:06:58 +00:00
212 lines
6.6 KiB
TypeScript
212 lines
6.6 KiB
TypeScript
import {
|
|
createContext,
|
|
useContext,
|
|
useState,
|
|
useCallback,
|
|
useRef,
|
|
} from 'react';
|
|
import ShortUniqueId from 'short-unique-id';
|
|
|
|
const MessageQueueContext = createContext(null);
|
|
const uid = new ShortUniqueId({ length: 8 });
|
|
|
|
export const useMessageQueue = () => useContext(MessageQueueContext);
|
|
|
|
export const MessageQueueProvider = ({ children }) => {
|
|
const messageQueueRef = useRef([]);
|
|
const [queueChats, setQueueChats] = useState({}); // Stores chats and status for display
|
|
const maxRetries = 2;
|
|
|
|
const clearStatesMessageQueueProvider = useCallback(() => {
|
|
setQueueChats({});
|
|
messageQueueRef.current = [];
|
|
}, []);
|
|
|
|
// Promise-based lock to prevent concurrent executions
|
|
const processingPromiseRef = useRef(Promise.resolve());
|
|
|
|
// Function to add a message to the queue
|
|
const addToQueue = useCallback(
|
|
(sendMessageFunc, messageObj, type, groupDirectId) => {
|
|
const tempId = uid.rnd();
|
|
const chatData = {
|
|
...messageObj,
|
|
type,
|
|
groupDirectId,
|
|
signature: uid.rnd(),
|
|
identifier: tempId,
|
|
retries: 0, // Retry count for display purposes
|
|
status: 'pending', // Initial status is 'pending'
|
|
};
|
|
|
|
// Add chat data to queueChats for status tracking
|
|
setQueueChats((prev) => ({
|
|
...prev,
|
|
[groupDirectId]: [...(prev[groupDirectId] || []), chatData],
|
|
}));
|
|
|
|
// Add the message to the global messageQueueRef
|
|
messageQueueRef.current.push({
|
|
func: sendMessageFunc,
|
|
identifier: tempId,
|
|
groupDirectId,
|
|
specialId: messageObj?.message?.specialId,
|
|
});
|
|
|
|
// Start processing the queue
|
|
processQueue([], groupDirectId);
|
|
},
|
|
[]
|
|
);
|
|
|
|
// Function to process the message queue
|
|
const processQueue = useCallback((newMessages = [], groupDirectId) => {
|
|
processingPromiseRef.current = processingPromiseRef.current
|
|
.then(() => processQueueInternal(newMessages, groupDirectId))
|
|
.catch((err) => console.error('Error in processQueue:', err));
|
|
}, []);
|
|
|
|
// Internal function to handle queue processing
|
|
const processQueueInternal = async (newMessages, groupDirectId) => {
|
|
// Remove any messages from the queue that match the specialId from newMessages
|
|
|
|
// If the queue is empty, no need to process
|
|
if (messageQueueRef.current.length === 0) return;
|
|
|
|
// Process messages sequentially
|
|
while (messageQueueRef.current.length > 0) {
|
|
const currentMessage = messageQueueRef.current[0]; // Get the first message in the queue
|
|
const { groupDirectId, identifier } = currentMessage;
|
|
|
|
// Update the chat status to 'sending'
|
|
setQueueChats((prev) => {
|
|
const updatedChats = { ...prev };
|
|
if (updatedChats[groupDirectId]) {
|
|
const chatIndex = updatedChats[groupDirectId].findIndex(
|
|
(item) => item.identifier === identifier
|
|
);
|
|
if (chatIndex !== -1) {
|
|
updatedChats[groupDirectId][chatIndex].status = 'sending';
|
|
}
|
|
}
|
|
return updatedChats;
|
|
});
|
|
|
|
try {
|
|
// Execute the function stored in the messageQueueRef
|
|
|
|
await currentMessage.func();
|
|
|
|
// Remove the message from the queue after successful sending
|
|
messageQueueRef.current.shift();
|
|
} catch (error) {
|
|
console.error('Message sending failed', error);
|
|
|
|
// Retry logic
|
|
setQueueChats((prev) => {
|
|
const updatedChats = { ...prev };
|
|
const chatIndex = updatedChats[groupDirectId]?.findIndex(
|
|
(item) => item.identifier === identifier
|
|
);
|
|
if (chatIndex !== -1) {
|
|
const retries = updatedChats[groupDirectId][chatIndex].retries;
|
|
if (retries < maxRetries) {
|
|
// Increment retry count and set status to 'failed'
|
|
updatedChats[groupDirectId][chatIndex].retries += 1;
|
|
updatedChats[groupDirectId][chatIndex].status = 'failed';
|
|
} else {
|
|
// Max retries reached, set status to 'failed-permanent'
|
|
updatedChats[groupDirectId][chatIndex].status =
|
|
'failed-permanent';
|
|
|
|
// Remove the message from the queue after max retries
|
|
messageQueueRef.current.shift();
|
|
}
|
|
}
|
|
return updatedChats;
|
|
});
|
|
}
|
|
|
|
// Optional delay between processing messages
|
|
// await new Promise((res) => setTimeout(res, 5000));
|
|
}
|
|
};
|
|
|
|
// Method to process with new messages and groupDirectId
|
|
const processWithNewMessages = (newMessages, groupDirectId) => {
|
|
let updatedNewMessages = newMessages;
|
|
if (newMessages.length > 0) {
|
|
// Remove corresponding entries in queueChats for the provided groupDirectId
|
|
setQueueChats((prev) => {
|
|
const updatedChats = { ...prev };
|
|
if (updatedChats[groupDirectId]) {
|
|
updatedNewMessages = newMessages?.map((msg) => {
|
|
const findTempMsg = updatedChats[groupDirectId]?.find(
|
|
(msg2) => msg2?.message?.specialId === msg?.specialId
|
|
);
|
|
if (findTempMsg) {
|
|
return {
|
|
...msg,
|
|
tempSignature: findTempMsg?.signature,
|
|
};
|
|
}
|
|
return msg;
|
|
});
|
|
|
|
updatedChats[groupDirectId] = updatedChats[groupDirectId].filter(
|
|
(chat) => {
|
|
return !newMessages.some(
|
|
(newMsg) => newMsg?.specialId === chat?.message?.specialId
|
|
);
|
|
}
|
|
);
|
|
|
|
// Remove messages with status 'failed-permanent'
|
|
updatedChats[groupDirectId] = updatedChats[groupDirectId].filter(
|
|
(chat) => {
|
|
return chat?.status !== 'failed-permanent';
|
|
}
|
|
);
|
|
|
|
// If no more chats for this group, delete the groupDirectId entry
|
|
if (updatedChats[groupDirectId].length === 0) {
|
|
delete updatedChats[groupDirectId];
|
|
}
|
|
}
|
|
return updatedChats;
|
|
});
|
|
}
|
|
setTimeout(() => {
|
|
if (
|
|
!messageQueueRef.current.find(
|
|
(msg) => msg?.groupDirectId === groupDirectId
|
|
)
|
|
) {
|
|
setQueueChats((prev) => {
|
|
const updatedChats = { ...prev };
|
|
if (updatedChats[groupDirectId]) {
|
|
delete updatedChats[groupDirectId];
|
|
}
|
|
|
|
return updatedChats;
|
|
});
|
|
}
|
|
}, 300);
|
|
|
|
return updatedNewMessages;
|
|
};
|
|
|
|
return (
|
|
<MessageQueueContext.Provider
|
|
value={{
|
|
addToQueue,
|
|
queueChats,
|
|
clearStatesMessageQueueProvider,
|
|
processWithNewMessages,
|
|
}}
|
|
>
|
|
{children}
|
|
</MessageQueueContext.Provider>
|
|
);
|
|
};
|