Skip to content
Advertisement

Passing Variable Length C String Through Mmap-ed Shared Memory

Let’s say I have a process A and a process B, and process A would like to pass a C string to process B through a shm_open() + mmap() shared memory.

What’s the most latency efficient way?

The answer of this post suggested that after C++11, std::atomic is the right way to share data over shared memory.

However, I fail to see how I can write something to write a C string with something like this:

struct Buffer {
std::atomic<uint32_t> length;
std::atomic<char*> str;
} __attribute__((packed));

Given I have a shared memory created this way:

class SHM {
    char* _ptr;
public:
    SHM() {
        const auto handle = shm_open("myTest", O_RDWR|O_CREAT, 0666);
        const auto size =  4 * 1024 * 1024;
        if (-1 == ftruncate(handle, size)) {
            throw;
        }
        _ptr = (char*)mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);

        if(_ptr == MAP_FAILED){
            throw;
        }

        int rc = fchmod(handle, 0666);
        if (rc == -1) {
            throw;
        }
    }

    // assume to caller will do buffer.size.store(someLength, std::memory_order_release); after filling up Buffer::str
    Buffer& getBuffer() noexcept {
        return *reinrepret_cast<Buffer*>(_ptr);
    }

    Buffer& read() {
        auto& buffer = *reinrepret_cast<Buffer*>(_ptr);
        while (buffer.size.load(std::memory_order_acquire) > 0) {
            buffer.str.load(std::memory_order_relaxed);
            return buffer;
        }
    }
};

How can the caller to SHM::getBuffer() properly write to Buffer::str char by char so that process B can call SHM::read() to retrieve?

Does buffer.str.load(std::memory_order_relaxed) actually load atomically and correctly? I doubt that as it doesn’t even know the length.

This is for Linux, X86-64, GCC 7.

Thanks in advance.

Advertisement

Answer

Here is a working sketch for single-producer-single-consumer case (it doesn’t matter if the producer/consumer threads from the same process or not), wait-free:

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <unistd.h>
#include <fcntl.h>
#include <utility>
#include <cstring>
#include <string>
#include <atomic>

class SingleProducerSingleConsumerIndexes {
    std::atomic<uint64_t> produced_ = {};
    std::atomic<uint64_t> consumed_ = {};

public: // Producer interface.
    uint64_t produced() {
        auto consumed = consumed_.load(std::memory_order_acquire); // Syncronizes with store 2.
        auto produced = produced_.load(std::memory_order_relaxed);
        if(produced != consumed || !produced)
            return produced;
        // Entire buffer was consumed. Rewind.
        produced_.store(0, std::memory_order_release); // Store 1.
        consumed_.store(0, std::memory_order_relaxed); // Store 3.
        return 0;
    }

    void produce(uint64_t end) {
        produced_.store(end, std::memory_order_release); // Store 1.
    }

public: // Consumer interface.
    std::pair<uint64_t, uint64_t> available() const {
        auto produced = produced_.load(std::memory_order_acquire); // Syncronizes with store 1.
        auto consumed = consumed_.load(std::memory_order_relaxed);
        // min handles the case of store 3 not visible yet.
        return {std::min(produced, consumed), produced};
    }

    void consume(uint64_t end) {
        consumed_.store(end, std::memory_order_release); // Store 2.
    }
};

class SharedMemoryStrings {
    void* p_;
    static constexpr int size = 4 * 1024 * 1024;
    static constexpr int buffer_size = size - sizeof(SingleProducerSingleConsumerIndexes);
public:
    SharedMemoryStrings() {
        auto handle = ::shm_open("/another-test", O_RDWR|O_CREAT, 0666);
        if(-1 == ::ftruncate(handle, size))
            throw;
        p_ = ::mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);
        ::close(handle);
        if(p_ == MAP_FAILED)
            throw;
    }

    ~SharedMemoryStrings() {
        ::munmap(p_, size);
    }

    void produce(std::string const& s) {
        auto* indexes = static_cast<SingleProducerSingleConsumerIndexes*>(p_);
        auto produced = indexes->produced();
        uint64_t new_end = produced + sizeof(uint64_t) + s.size();
        if(new_end > buffer_size)
            throw; // Out of buffer space.

        auto* buffer = reinterpret_cast<char*>(indexes + 1) + produced;
        uint64_t size = s.size();
        memcpy(buffer, &size, sizeof size);
        buffer += sizeof size;
        memcpy(buffer, s.data(), s.size());

        indexes->produce(new_end);
    }

    bool try_consume(std::string& s) {
        auto* indexes = static_cast<SingleProducerSingleConsumerIndexes*>(p_);
        auto available = indexes->available();
        auto consumed = available.first;
        auto produced = available.second;
        if(consumed == produced)
            return false; // No data available.

        auto* buffer = reinterpret_cast<char const*>(indexes + 1) + consumed;
        uint64_t size;
        memcpy(&size, buffer, sizeof size);
        buffer += sizeof size;
        // Reuse the string to minimize memory allocations.
        s.assign(buffer, size);

        indexes->consume(consumed + sizeof(uint64_t) + size);
        return true;
    }
};

int main(int ac, char** av) {
    if(ac > 1) {
        // Producer.
        SharedMemoryStrings a;
        for(int i = 1; i < ac; ++i)
            a.produce(av[i]);
    }
    else {
        // Consumer.
        SharedMemoryStrings a;
        for(std::string s;;) { // Busy-wait loop.
            if(a.try_consume(s)) // Reuse the string to minimize memory allocations.
                printf("%sn", s.c_str());
            // else // Potential optimization.
            //     _mm_pause();
        }
    }
}

Notes:

  • Compile the code like g++ -o test -W{all,extra,error} -std=gnu++11 -O3 -DNDEBUG -march=native -pthread -lrt test.cc. Assuming this source is called test.cc.

  • Start the consumer with no arguments, ./test. The producer with arguments, like ./test hello world. The start order does not matter.

  • It is a single-producer-single-consumer solution. It is wait-free (producer and consumer calls complete in a fixed number of instructions, no loop), which is better than just lock-free (which doesn’t guarantee completion in a fixed number of instructions). Cannot go faster that that.

  • On x86-64 these acquire and release atomic loads and stores compile into plain mov instructions because current x86-64 memory model is a bit too strong. However, using std::atomic and specific memory orders ensures that the compiler does not reorder instructions. And it also makes sure that the code compiles and works correctly on architectures with weaker memory models and inserts appropriate barriers if necessary, which volatile cannot possibly do. Like PowerPC, for example. Using volatile is the same as using std::memory_order_relaxed. See the assembly comparison.

  • produced_.store(end, std::memory_order_release); ensures that all previous stores (memcpy into the shared memory) made by producer thread become visible to consumer thread as soon as the effect of this store is visible by produced_.load(std::memory_order_acquire);. See http://preshing.com/20130823/the-synchronizes-with-relation/ for thorough treatment of the subject. Also std::memory_order says it best:

    memory_order_acquire A load operation with this memory order performs the acquire operation on the affected memory location: no reads or writes in the current thread can be reordered before this load. All writes in other threads that release the same atomic variable are visible in the current thread.

    memory_order_release A store operation with this memory order performs the release operation: no reads or writes in the current thread can be reordered after this store. All writes in the current thread are visible in other threads that acquire the same atomic variable and writes that carry a dependency into the atomic variable become visible in other threads that consume the same atomic.

  • The producer detects when the consumer has consumed all available data. In this case the producer rewinds the buffer to the start. This is done to avoid handling buffer wrapping for ring-buffer. If the consumer cannot process messages fast enough the buffer will get full eventually regardless.

  • It never calls SingleProducerSingleConsumerIndexes constructor. It relies on the fact that a new file is zero-initialized and that is what the constructor would do. In more complex scenarios it needs to invoke the constructor of shared data if the file has just been created. That can be done by creating a temporary file with a unique name first (if the file does not exist yet), mapping the file into memory and invoking the constructor. Then renaming that temporary file to the final name (rename is atomic). If renaming fails because the file already exists, delete the temporary file and start again.

  • The consumer does busy-waiting for lowest possible latency. If you would like the consumer to block while waiting it is possible to add a process shared mutex and condition variable to make that happen. It takes a few microseconds to wake up a thread waiting on a condition variable (futex in Linux) in the kernel, though. That would require calling SingleProducerSingleConsumerIndexes constructor to do all required initialization (e.g. initialize a robust adaptive process-shared mutex and a process-shared condition variable).

Advertisement