Skip to content
Advertisement

Organize multitrheading datalogging [closed]

I’m working on a C project for an embedded target running a Linux distribution (build with Yocto). I’m new to Linux embedded world and I have to build a data logging system.

I’m already learning how to use threads, and I’m thinking on the project organization.

Here is what I imagine :

  • multiple threads to collect data from different interfaces, CAN bus, I2C… (different sample rate)
  • one thread with a sample rate of 200ms to populate csv files
  • one thread with a sample rate of 3 seconds to send data with http request
  • Threads will stop on CAN info or external event

I don’t know what is the best way to organize this project. I see two ways, in the first a startup program create each thread and wait in a while loop with event watching to stop them. The second way is a startup program execute others binaries as thread. In the two ways I don’t know how share data between threads.

Can you share me your experience ?

Thank you

EDIT : First, thanks a lot to @Glärbo, for your explanations. It’s really helpful to learn multi threading mechanic.

I’ve tested it with success.

For future readers I’ve drawn diagrams to illustrate @Glärbo answer.

main thread

productor-sensor thread

datalogger thread

Advertisement

Answer

I would do it simpler, using a simple multiple producers, single consumer approach.

Let’s assume each data item can be described using a single numerical value:

struct value {
    struct value   *next;  /* Forming a singly-linked list of data items */
    struct sensor  *from;  /* Identifies which sensor value this is */
    struct timespec when;  /* Time of sensor reading in UTC */
    double          value; /* Numerical value */
};

I would use two lists of values: one for sensor readings received but not stored, and one for unused value buckets. This way you don’t need to dynamically allocate or free value buckets, unless you want to (by manipulating the unused list).

Both lists are protected by a mutex. Since the unused list may be empty, we need a condition variable (that is signaled on whenever a new unused value is added to it) so that threads can wait for one to become available. The received list similarly needs a condition variable, so that if it happens to be empty when the consumer (data storer) wants them, it can wait for at least one to appear.

static pthread_mutex_t  unused_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t   unused_wait = PTHREAD_COND_INITIALIZER;
static struct value    *unused_list = NULL;

static pthread_mutex_t  received_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t   received_wait = PTHREAD_COND_INITIALIZER;
static struct value    *received_list = NULL;

For the unused list, we need three helpers: one to create new unused value items from scratch (which you call initially to create say two or three value items per sensor, plus a few), and later on, if you think you need them (say, if you add new sensors run time):

int unused_create(void)
{
    struct value *v;

    v = malloc(sizeof *v);
    if (!v)
        return ENOMEM;

    v->from = NULL;

    pthread_mutex_lock(&unused_lock);
    v->next = unused_list;
    unused_list = v;
    pthread_cond_signal(&unused_wait);
    pthread_mutex_unlock(&unused_lock);

    return 0;
}

The other two are needed to get and put value items from/back to the list:

struct value *unused_get(void)
{
    struct value *v;

    pthread_mutex_lock(&unused_lock);
    while (!unused_list)
        pthread_cond_wait(&unused_wait, &unused_lock);
    v = unused_list;
    unused_list = unused_list->next;
    pthread_mutex_unlock(&unused_lock);

    v->from = NULL;

    return v;
}

void unused_put(struct value *v)
{
    v->from = NULL;

    pthread_mutex_lock(&unused_lock);
    v->next = unused_list;
    unused_list = v;
    pthread_cond_signal(&unused_wait);
    pthread_mutex_unlock(&unused_lock);
}

The idea above is that when the from member is NULL, the item is unused (as it is not from any sensor). Technically, we don’t need to clear it to NULL at every stage, but I like to be thorough: it’s not like setting it is a costly operation.

Sensor-accessing producers take the sensor reading, get the current time using e.g. clock_gettime(CLOCK_REALTIME, &timespec), and then use unused_get() to grab a new unused item. (The order is important, because unused_get() may take some time, if there are no free items.) Then, they fill in the fields, and call the following received_put() to prepend the reading to the list:

void received_put(struct value *v)
{
    pthread_mutex_lock(&received_lock);
    v->next = received_list;
    received_list = v;
    pthread_mutex_signal(&received_wait);
    pthread_mutex_unlock(&received_lock);
}

There is only one thread that periodically collects all received sensor readings, and stores them. It can keep a set of most recent readings, and send those periodically. Instead of calling some received_get() repeatedly until there are no more received values not handled yet, we should use a function that returns the whole list of them:

struct value *received_getall(void)
{
    struct value *v;
    pthread_mutex_lock(&received_lock);
    while (!received_list)
        pthread_cond_wait(&received_wait, &received_lock);
    v = received_list;
    received_list = NULL;
    pthread_mutex_unlock(&received_lock);
    return v;
}

The consumer thread, storing/sending the summaries and readings, should obtain the whole list, then handle them one by one. After each item has been processed, they should be added to the unused list. In other words, something like

    struct value *all, v;

    while (1) {
        all = receive_getall();
        while (all) {
            v = all;
            all = all->next;
            v->next = NULL;

            /* Store/summarize value item v */

            unused_put(v);
        }
    }

As you can see, while the consumer thread is handling the sensor value items, the sensor threads can add new readings for the next round, as long as there are enough free value item buckets to use.

Of course, you can also allocate lots of values at one malloc() call, but then you must somehow remember which pool of values each value belongs to to free them. So:

struct owner {
    size_t          size;    /* Number of value's */
    size_t          used;    /* Number of value's not freed yet */
    struct value    value[];
};
struct value {
    struct value   *next;  /* Forming a singly-linked list of data items */
    struct owner   *owner; /* Part of which value array, NULL if standalone */
    struct sensor  *from;  /* Identifies which sensor value this is */
    struct timespec when;  /* Time of sensor reading in UTC */
    double          value; /* Numerical value */
};

int unused_add_array(const size_t size)
{
    struct owner *o;
    struct value *v;
    size_t        i;

    o = malloc(sizeof (struct owner) + size * sizeof (struct value));
    if (!o)
        return ENOMEM;

    o->size = size;
    o->used = used;
    i = size - 1;

    pthread_mutex_lock(&unused_lock);
    o->value[i].next = unused_list;
    while (i-->0)
       o->value[i].next = o->value + i + 1;    
    unused_list = o->value[0];
    pthread_cond_broadcast(&unused_wait);
    pthread_mutex_unlock(&unused_lock);
    return 0;
}

/* Instead of unused_put(), call unused_free() to discard a value */
void unused_free(struct value *v)
{
    pthread_mutex_lock(&unused_lock);
    v->from = NULL;

    if (v->owner) {
        if (v->owner->used > 1) {
            v->owner->used--;
            return;
        }
        v->owner->size = 0;
        v->owner->used = 0;
        free(v->owner);
        return;
    }
    free(v);
    return;
}

The reason unused_free() uses unused_lock is that we must be sure that no other thread is accessing the bucket when we free it. Otherwise, we can have a race window, where the other thread may use the value after we free()d it.

Remember that the Linux C library, like most other C libraries, does not return dynamically allocated memory to the operating system at free(); memory is only returned if it is large enough to matter. (Currently on x86 and x86-64, the glibc limit is about 132,000 bytes or so; anything smaller is left in the process heap, and used to satisfy future malloc()/calloc()/realloc() calls.)

The contents of the struct sensor are up to you, but personally, I’d put at least

struct sensor {
    pthread_t   worker;
    int         connfd;  /* Device or socket descriptor */
    const char *name;    /* Some kind of identifier, perhaps header in CSV */
    const char *units;   /* Optional, could be useful */
};

plus possibly sensor reading interval (in, say, milliseconds) in it.

In practice, because there is only one consumer thread, I’d use the main thread for it.

5 People found this is helpful
Advertisement