mirror of https://github.com/Qortal/Brooklyn
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
389 lines
9.9 KiB
389 lines
9.9 KiB
/* |
|
Copyright (c) 2012, Broadcom Europe Ltd |
|
All rights reserved. |
|
|
|
Redistribution and use in source and binary forms, with or without |
|
modification, are permitted provided that the following conditions are met: |
|
* Redistributions of source code must retain the above copyright |
|
notice, this list of conditions and the following disclaimer. |
|
* Redistributions in binary form must reproduce the above copyright |
|
notice, this list of conditions and the following disclaimer in the |
|
documentation and/or other materials provided with the distribution. |
|
* Neither the name of the copyright holder nor the |
|
names of its contributors may be used to endorse or promote products |
|
derived from this software without specific prior written permission. |
|
|
|
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND |
|
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
|
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY |
|
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES |
|
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; |
|
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND |
|
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
|
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
*/ |
|
|
|
#include "vcos.h" |
|
#include "vcos_msgqueue.h" |
|
#include <stddef.h> |
|
#include <string.h> |
|
#include <stdio.h> |
|
|
|
#define MAGIC VCOS_MSGQ_MAGIC |
|
|
|
/* Probably a good idea for MSG_T to be multiple of 8 so that doubles |
|
* are naturally aligned without problem. |
|
*/ |
|
vcos_static_assert((sizeof(VCOS_MSG_T) & 7) == 0); |
|
|
|
static void vcos_msgq_pool_on_reply(VCOS_MSG_WAITER_T *waiter, |
|
VCOS_MSG_T *msg); |
|
static void vcos_msgq_queue_waiter_on_reply(VCOS_MSG_WAITER_T *waiter, |
|
VCOS_MSG_T *msg); |
|
|
|
/** Simple reply protocol. The client creates a semaphore and waits |
|
* for it. No queuing of multiple replies is possible but nothing needs |
|
* to be setup in advance. Because creating semaphores is very fast on |
|
* VideoCore there's no need to do anything elaborate to optimize create |
|
* time - this might need revisiting on other platforms. |
|
*/ |
|
|
|
typedef struct |
|
{ |
|
VCOS_MSG_WAITER_T waiter; |
|
VCOS_SEMAPHORE_T waitsem; |
|
} VCOS_MSG_SIMPLE_WAITER_T; |
|
|
|
static void vcos_msgq_simple_waiter_on_reply(VCOS_MSG_WAITER_T *waiter, |
|
VCOS_MSG_T *msg) |
|
{ |
|
VCOS_MSG_SIMPLE_WAITER_T *self; |
|
(void)msg; |
|
self = (VCOS_MSG_SIMPLE_WAITER_T*)waiter; |
|
vcos_semaphore_post(&self->waitsem); |
|
} |
|
|
|
static VCOS_STATUS_T vcos_msgq_simple_waiter_init(VCOS_MSG_SIMPLE_WAITER_T *waiter) |
|
{ |
|
VCOS_STATUS_T status; |
|
status = vcos_semaphore_create(&waiter->waitsem, "waiter", 0); |
|
waiter->waiter.on_reply = vcos_msgq_simple_waiter_on_reply; |
|
return status; |
|
} |
|
|
|
static void vcos_msgq_simple_waiter_deinit(VCOS_MSG_SIMPLE_WAITER_T *waiter) |
|
{ |
|
vcos_semaphore_delete(&waiter->waitsem); |
|
} |
|
|
|
/* |
|
* Message queues |
|
*/ |
|
|
|
static VCOS_STATUS_T vcos_msgq_create_internal(VCOS_MSGQUEUE_T *q, const char *name) |
|
{ |
|
VCOS_STATUS_T st; |
|
|
|
memset(q, 0, sizeof(*q)); |
|
|
|
q->waiter.on_reply = vcos_msgq_queue_waiter_on_reply; |
|
st = vcos_semaphore_create(&q->sem, name, 0); |
|
if (st != VCOS_SUCCESS) |
|
goto fail_sem; |
|
|
|
st = vcos_mutex_create(&q->lock, name); |
|
if (st != VCOS_SUCCESS) |
|
goto fail_mtx; |
|
|
|
return st; |
|
|
|
fail_mtx: |
|
vcos_semaphore_delete(&q->sem); |
|
fail_sem: |
|
return st; |
|
} |
|
|
|
static void vcos_msgq_delete_internal(VCOS_MSGQUEUE_T *q) |
|
{ |
|
vcos_semaphore_delete(&q->sem); |
|
vcos_mutex_delete(&q->lock); |
|
} |
|
|
|
VCOS_STATUS_T vcos_msgq_create(VCOS_MSGQUEUE_T *q, const char *name) |
|
{ |
|
VCOS_STATUS_T st; |
|
|
|
st = vcos_msgq_create_internal(q, name); |
|
|
|
return st; |
|
} |
|
|
|
void vcos_msgq_delete(VCOS_MSGQUEUE_T *q) |
|
{ |
|
vcos_msgq_delete_internal(q); |
|
} |
|
|
|
/* append a message to a message queue */ |
|
static _VCOS_INLINE void msgq_append(VCOS_MSGQUEUE_T *q, VCOS_MSG_T *msg) |
|
{ |
|
vcos_mutex_lock(&q->lock); |
|
if (q->head == NULL) |
|
{ |
|
q->head = q->tail = msg; |
|
} |
|
else |
|
{ |
|
q->tail->next = msg; |
|
q->tail = msg; |
|
} |
|
vcos_mutex_unlock(&q->lock); |
|
} |
|
|
|
/* |
|
* A waiter for a message queue. Just appends the message to the |
|
* queue, waking up the waiting thread. |
|
*/ |
|
static void vcos_msgq_queue_waiter_on_reply(VCOS_MSG_WAITER_T *waiter, |
|
VCOS_MSG_T *msg) |
|
{ |
|
VCOS_MSGQUEUE_T *queue = (VCOS_MSGQUEUE_T*)waiter; |
|
msgq_append(queue, msg); |
|
vcos_semaphore_post(&queue->sem); |
|
} |
|
|
|
/* initialise this library */ |
|
|
|
VCOS_STATUS_T vcos_msgq_init(void) |
|
{ |
|
return VCOS_SUCCESS; |
|
} |
|
|
|
void vcos_msgq_deinit(void) |
|
{ |
|
} |
|
|
|
static _VCOS_INLINE |
|
void vcos_msg_send_helper(VCOS_MSG_WAITER_T *waiter, |
|
VCOS_MSGQUEUE_T *dest, |
|
uint32_t code, |
|
VCOS_MSG_T *msg) |
|
{ |
|
vcos_assert(msg); |
|
vcos_assert(dest); |
|
|
|
msg->code = code; |
|
if (waiter) |
|
msg->waiter = waiter; |
|
msg->next = NULL; |
|
msg->src_thread = vcos_thread_current(); |
|
|
|
msgq_append(dest, msg); |
|
vcos_semaphore_post(&dest->sem); |
|
} |
|
|
|
/* wait on a queue for a message */ |
|
VCOS_MSG_T *vcos_msg_wait(VCOS_MSGQUEUE_T *queue) |
|
{ |
|
VCOS_MSG_T *msg; |
|
vcos_semaphore_wait(&queue->sem); |
|
vcos_mutex_lock(&queue->lock); |
|
|
|
msg = queue->head; |
|
vcos_assert(msg); /* should always be a message here! */ |
|
|
|
queue->head = msg->next; |
|
if (queue->head == NULL) |
|
queue->tail = NULL; |
|
|
|
vcos_mutex_unlock(&queue->lock); |
|
return msg; |
|
} |
|
|
|
/* peek on a queue for a message */ |
|
VCOS_MSG_T *vcos_msg_peek(VCOS_MSGQUEUE_T *queue) |
|
{ |
|
VCOS_MSG_T *msg; |
|
vcos_mutex_lock(&queue->lock); |
|
|
|
msg = queue->head; |
|
|
|
/* if there's a message, remove it from the queue */ |
|
if (msg) |
|
{ |
|
queue->head = msg->next; |
|
if (queue->head == NULL) |
|
queue->tail = NULL; |
|
|
|
/* keep the semaphore count consistent */ |
|
|
|
/* coverity[lock_order] |
|
* the semaphore must have a non-zero count so cannot block here. |
|
*/ |
|
vcos_semaphore_wait(&queue->sem); |
|
} |
|
|
|
vcos_mutex_unlock(&queue->lock); |
|
return msg; |
|
} |
|
|
|
void vcos_msg_send(VCOS_MSGQUEUE_T *dest, uint32_t code, VCOS_MSG_T *msg) |
|
{ |
|
vcos_assert(msg->magic == MAGIC); |
|
vcos_msg_send_helper(NULL, dest, code, msg); |
|
} |
|
|
|
/** Send on to the target queue, then wait on a simple waiter for the reply |
|
*/ |
|
VCOS_STATUS_T vcos_msg_sendwait(VCOS_MSGQUEUE_T *dest, uint32_t code, VCOS_MSG_T *msg) |
|
{ |
|
VCOS_STATUS_T st; |
|
VCOS_MSG_SIMPLE_WAITER_T waiter; |
|
|
|
vcos_assert(msg->magic == MAGIC); |
|
|
|
/* if this fires, you've set a waiter up but are now about to obliterate it |
|
* with the 'wait for a reply' waiter. |
|
*/ |
|
vcos_assert(msg->waiter == NULL); |
|
|
|
if ((st=vcos_msgq_simple_waiter_init(&waiter)) != VCOS_SUCCESS) |
|
return st; |
|
|
|
vcos_msg_send_helper(&waiter.waiter, dest, code, msg); |
|
vcos_semaphore_wait(&waiter.waitsem); |
|
vcos_msgq_simple_waiter_deinit(&waiter); |
|
|
|
return VCOS_SUCCESS; |
|
} |
|
|
|
/** Send a reply to a message |
|
*/ |
|
void vcos_msg_reply(VCOS_MSG_T *msg) |
|
{ |
|
vcos_assert(msg->magic == MAGIC); |
|
msg->code |= MSG_REPLY_BIT; |
|
if (msg->waiter) |
|
{ |
|
msg->waiter->on_reply(msg->waiter, msg); |
|
} |
|
else |
|
{ |
|
VCOS_ALERT("%s: reply to non-reply message id %d", |
|
VCOS_FUNCTION, |
|
msg->code); |
|
vcos_assert(0); |
|
} |
|
} |
|
|
|
void vcos_msg_set_source(VCOS_MSG_T *msg, VCOS_MSGQUEUE_T *queue) |
|
{ |
|
vcos_assert(msg); |
|
vcos_assert(msg->magic == MAGIC); |
|
vcos_assert(queue); |
|
msg->waiter = &queue->waiter; |
|
} |
|
|
|
/* |
|
* Message pools |
|
*/ |
|
|
|
VCOS_STATUS_T vcos_msgq_pool_create(VCOS_MSGQ_POOL_T *pool, |
|
size_t count, |
|
size_t payload_size, |
|
const char *name) |
|
{ |
|
VCOS_STATUS_T status; |
|
int bp_size = payload_size + sizeof(VCOS_MSG_T); |
|
status = vcos_blockpool_create_on_heap(&pool->blockpool, |
|
count, bp_size, |
|
VCOS_BLOCKPOOL_ALIGN_DEFAULT, |
|
0, |
|
name); |
|
if (status != VCOS_SUCCESS) |
|
goto fail_pool; |
|
|
|
status = vcos_semaphore_create(&pool->sem, name, count); |
|
if (status != VCOS_SUCCESS) |
|
goto fail_sem; |
|
|
|
pool->waiter.on_reply = vcos_msgq_pool_on_reply; |
|
pool->magic = MAGIC; |
|
return status; |
|
|
|
fail_sem: |
|
vcos_blockpool_delete(&pool->blockpool); |
|
fail_pool: |
|
return status; |
|
} |
|
|
|
void vcos_msgq_pool_delete(VCOS_MSGQ_POOL_T *pool) |
|
{ |
|
vcos_blockpool_delete(&pool->blockpool); |
|
vcos_semaphore_delete(&pool->sem); |
|
} |
|
|
|
/** Called when a message from a pool is replied-to. Just returns |
|
* the message back to the blockpool. |
|
*/ |
|
static void vcos_msgq_pool_on_reply(VCOS_MSG_WAITER_T *waiter, |
|
VCOS_MSG_T *msg) |
|
{ |
|
vcos_unused(waiter); |
|
vcos_assert(msg->magic == MAGIC); |
|
vcos_msgq_pool_free(msg); |
|
} |
|
|
|
VCOS_MSG_T *vcos_msgq_pool_alloc(VCOS_MSGQ_POOL_T *pool) |
|
{ |
|
VCOS_MSG_T *msg; |
|
if (vcos_semaphore_trywait(&pool->sem) == VCOS_SUCCESS) |
|
{ |
|
msg = vcos_blockpool_calloc(&pool->blockpool); |
|
vcos_assert(msg); |
|
msg->magic = MAGIC; |
|
msg->waiter = &pool->waiter; |
|
msg->pool = pool; |
|
} |
|
else |
|
{ |
|
msg = NULL; |
|
} |
|
return msg; |
|
} |
|
|
|
void vcos_msgq_pool_free(VCOS_MSG_T *msg) |
|
{ |
|
if (msg) |
|
{ |
|
VCOS_MSGQ_POOL_T *pool; |
|
vcos_assert(msg->pool); |
|
|
|
pool = msg->pool; |
|
vcos_assert(msg->pool->magic == MAGIC); |
|
|
|
vcos_blockpool_free(msg); |
|
vcos_semaphore_post(&pool->sem); |
|
} |
|
} |
|
|
|
VCOS_MSG_T *vcos_msgq_pool_wait(VCOS_MSGQ_POOL_T *pool) |
|
{ |
|
VCOS_MSG_T *msg; |
|
vcos_semaphore_wait(&pool->sem); |
|
msg = vcos_blockpool_calloc(&pool->blockpool); |
|
vcos_assert(msg); |
|
msg->magic = MAGIC; |
|
msg->waiter = &pool->waiter; |
|
msg->pool = pool; |
|
return msg; |
|
} |
|
|
|
void vcos_msg_init(VCOS_MSG_T *msg) |
|
{ |
|
msg->magic = MAGIC; |
|
msg->next = NULL; |
|
msg->waiter = NULL; |
|
msg->pool = NULL; |
|
}
|
|
|