Pipes

A pipe is a kernel object that allows a thread to send a byte stream to another thread. Pipes can be used to transfer chunks of data in whole or in part, and either synchronously or asynchronously.

Concepts

The pipe can be configured with a ring buffer which holds data that has been sent but not yet received; alternatively, the pipe may have no ring buffer.

Any number of pipes can be defined. Each pipe is referenced by its memory address.

A pipe has the following key property:

  • A size that indicates the size of the pipe’s ring buffer. Note that a size of zero defines a pipe with no ring buffer.

A pipe must be initialized before it can be used. The pipe is initially empty.

Data can be synchronously sent either in whole or in part to a pipe by a thread. If the specified minimum number of bytes can not be immediately satisfied, then the operation will either fail immediately or attempt to send as many bytes as possible and then pend in the hope that the send can be completed later. Accepted data is either copied to the pipe’s ring buffer or directly to the waiting reader(s).

Data can be asynchronously sent in whole using a memory block to a pipe by a thread. Once the pipe has accepted all the bytes in the memory block, it will free the memory block and may give a semaphore if one was specified.

Data can be synchronously received from a pipe by a thread. If the specified minimum number of bytes can not be immediately satisfied, then the operation will either fail immediately or attempt to receive as many bytes as possible and then pend in the hope that the receive can be completed later. Accepted data is either copied from the pipe’s ring buffer or directly from the waiting sender(s).

Note

The kernel does NOT allow for an ISR to send or receive data to/from a pipe even if it does not attempt to wait for space/data.

Implementation

A pipe is defined using a variable of type struct k_pipe and an optional character buffer of type unsigned char. It must then be initialized by calling k_pipe_init().

The following code defines and initializes an empty pipe that has a ring buffer capable of holding 100 bytes and is aligned to a 4-byte boundary.

unsigned char __aligned(4) my_ring_buffer[100];
struct k_pipe my_pipe;

k_pipe_init(&my_pipe, my_ring_buffer, sizeof(my_ring_buffer));

Alternatively, a pipe can be defined and initialized at compile time by calling K_PIPE_DEFINE.

The following code has the same effect as the code segment above. Observe that that macro defines both the pipe and its ring buffer.

K_PIPE_DEFINE(my_pipe, 100, 4);

Writing to a Pipe

Data is added to a pipe by calling k_pipe_put().

The following code builds on the example above, and uses the pipe to pass data from a producing thread to one or more consuming threads. If the pipe’s ring buffer fills up because the consumers can’t keep up, the producing thread waits for a specified amount of time.

struct message_header {
    ...
};

void producer_thread(void)
{
    unsigned char *data;
    size_t total_size;
    size_t bytes_written;
    int    rc;
    ...

    while (1) {
        /* Craft message to send in the pipe */
        data = ...;
        total_size = ...;

        /* send data to the consumers */
        rc = k_pipe_put(&my_pipe, data, total_size, &bytes_written,
                        sizeof(struct message_header), K_NO_WAIT);

        if (rc < 0) {
            /* Incomplete message header sent */
            ...
        } else if (bytes_written < total_size) {
            /* Some of the data was sent */
            ...
        } else {
            /* All data sent */
            ...
        }
    }
}

Reading from a Pipe

Data is read from the pipe by calling k_pipe_get().

The following code builds on the example above, and uses the pipe to process data items generated by one or more producing threads.

void consumer_thread(void)
{
    unsigned char buffer[120];
    size_t   bytes_read;
    struct message_header  *header = (struct message_header *)buffer;

    while (1) {
        rc = k_pipe_get(&my_pipe, buffer, sizeof(buffer), &bytes_read,
                        sizeof(header), K_MSEC(100));

        if ((rc < 0) || (bytes_read < sizeof (header))) {
            /* Incomplete message header received */
            ...
        } else if (header->num_data_bytes + sizeof(header) > bytes_read) {
            /* Only some data was received */
            ...
        } else {
            /* All data was received */
            ...
        }
    }
}

Suggested uses

Use a pipe to send streams of data between threads.

Note

A pipe can be used to transfer long streams of data if desired. However it is often preferable to send pointers to large data items to avoid copying the data. The kernel’s memory map and memory pool object types can be helpful for data transfers of this sort.

Configuration Options

Related configuration options:

APIs

The following message queue APIs are provided by kernel.h: