Skip to content
Advertisement

Parallel TCP connection using threads

I am trying to build a system that opens parallel TCP sockets using threads. My threads are triggered using message queue IPC , thus every time a packet arrive to the message queue a thread “wakes up” , open TCP connection with remote server and send the packet. My problem is that in Wireshark , I can see the the time it takes to send a file is smaller using threads instead of one connection , but the throughput does not change.
My questions are :

  1. How can i verify my threads working parallel?
  2. How can i improve this code?, 3.How can i open several sockets using one thread?

I am using Virtual machine to run the multithreaded clients. The IDE I am using is Clion , language is C. My code:

#include<stdio.h>
#include<stdlib.h>
#include<sys/socket.h>
#include<string.h>
#include <arpa/inet.h>
#include <unistd.h> // for close
#include<pthread.h>
#include <math.h>
#include<malloc.h>
#include<signal.h>
#include<stdbool.h>
#include<sys/types.h>
#include<linux/if_packet.h>
#include<netinet/in.h>
#include<netinet/if_ether.h>    // for ethernet header
#include<netinet/ip.h>      // for ip header
#include<netinet/udp.h>     // for udp header
#include<netinet/tcp.h>
#include <byteswap.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <assert.h>
#include <time.h>
#define QUEUE_NAME "/ServerDan_Queue"
#define QUEUE_PERM 0660
#define MAX_MESSAGES 10 //Max size = 10
#define MAX_MSG_SIZE 4105 //Max size = 8192B
#define MSG_BUFFER_SIZE MAX_MSG_SIZE+10
#define BSIZE 1024
#define Nbytes 4096
#define ElorServer_addr "192.168.1.54"
///params:
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
struct sockaddr_in server;
struct stat obj;
int sock;
int k, size, status;
int i = 0;

typedef struct frag
{
    int packet_number;
    int seq;
    uint8_t data[4096];
    bool lastfrag;
} fragma;

void * middlemanThread(void *arg)
{
    ///========================================///
    ///**** Waiting for message queue trigger!:///
    ///=======================================///
    long id = (long)arg;
    id+=1;
    mqd_t qd; //queue descriptor
    //open the queue for reading//
    qd= mq_open(QUEUE_NAME,O_RDONLY);
    assert(qd != -1);
    struct mq_attr attr;
    assert(mq_getattr(qd,&attr) != -1);
    uint8_t *income_buf = calloc(attr.mq_msgsize,1);
    uint8_t *cast_buf = calloc(attr.mq_msgsize,1);
    assert(income_buf);
    fragma frag;
    struct timespec timeout;
    clock_gettime(CLOCK_REALTIME,&timeout);
    timeout.tv_sec+=50;
    //bool closesoc =false;
    printf("Waiting for messages ..... nn");
    while(1){
        ///========================================///
        ///**** Open message queue fo receive:///
        ///=======================================///

        if((mq_timedreceive(qd,income_buf,attr.mq_msgsize,0,&timeout))<0){
            printf("Failed to receive message for 50 sec n");
            //closesoc =true;
            pthread_exit(NULL);
        }
        else{
            cast_buf = income_buf;
            printf("Received successfully , your msg :n");
            frag.packet_number = *cast_buf;
            cast_buf = (cast_buf + sizeof(int));
            frag.seq = *cast_buf;
            cast_buf = (cast_buf + sizeof(int));
            memccpy(frag.data,((fragma*)cast_buf)->data,0,Nbytes);
            cast_buf = cast_buf + Nbytes;
            frag.lastfrag = *cast_buf;
            uint8_t * data = frag.data;
        }
        pthread_mutex_lock(&lock);

        ///========================================///
        ///**** Connecting to Server and send Frament:///
        ///=======================================///

        int size = sizeof(( fragma *)income_buf)->packet_number + sizeof(( fragma *)income_buf)->seq + sizeof(( fragma *)income_buf)->data + sizeof(( fragma *)income_buf)->lastfrag;
        printf("In threadn");
        int clientSocket;
        struct sockaddr_in serverAddr;
        socklen_t addr_size;

        // Create the socket.
        clientSocket = socket(PF_INET, SOCK_STREAM, 0);

        //Configure settings of the server address
        // Address family is Internet
        serverAddr.sin_family = AF_INET;

        //Set port number, using htons function
        serverAddr.sin_port = htons(8081);

        //Set IP address to localhost
        serverAddr.sin_addr.s_addr = inet_addr("192.168.14.149");
        memset(serverAddr.sin_zero, '', sizeof serverAddr.sin_zero);

        //Connect the socket to the server using the address
        addr_size = sizeof serverAddr;
        connect(clientSocket, (struct sockaddr *) &serverAddr, addr_size);
        if(send(clientSocket , income_buf , size,0) < 0)
        {
            printf("Send failedn");
        }
        printf("Trhead Id : %ld n" , id);
        printf("Packet number : %d n Seq = %d  n lasfrag = %dnn",frag.packet_number,frag.seq,(int)frag.lastfrag);
        pthread_mutex_unlock(&lock);
        //if(closesoc)
        close(clientSocket);
        usleep(20000);
    }
}
int main(){
    int i = 0;
    pthread_t tid[5];

    while(i< 5)
    {
        if( pthread_create(&tid[i], NULL, middlemanThread, (void*)i) != 0 )
            printf("Failed to create threadn");
        i++;
    }
    sleep(2);
    i = 0;
    while(i< 5)
    {
        pthread_join(tid[i++],NULL);
        printf("Thread ID : %d:n",i);
    }
    return 0;
}

Advertisement

Answer

thus every time a packet arrive to the message queue a thread “wakes up” , open TCP connection with remote server and send the packet

If you’re at all concerned about speed or efficiency, don’t do this. The single most expensive thing you can do with a TCP socket is the initial connection. You’re doing a 3-way handshake just to send a single message!

Then, you’re holding a global mutex while doing this entire operation – which, again, is the slowest operation in your program.

The current design is effectively single-threaded, but in the most complicated and expensive possible way.

I can see the the time it takes to send a file is smaller using threads instead of one connection , but the throughput does not change

I have no idea what you’re actually measuring, and it’s not at all clear that you do either. What is a file? One fragment? Multiple fragments? How big is it compared to your MTU? Have you checked that the fragments are actually received in the correct order, because it looks to me like the only possible parallelism is the spot where that could break.

How is it possible to have lower latency and unchanged throughput for a single file?

How can i verify my threads working parallely?

If you see multiple TCP connections in wireshark with different source ports, and their packets are interleaved, you have effective parallelism. This is unlikely though as you explicitly prohibited it with your global mutex!

What is the best way to check the throughput in wireshark?

Don’t. Use wireshark for inspecting packets, use the server to determine throughput. That’s where the results actually matter.

3.Is the concept of parallel TCP suppose to increase the throughput?

Why did you implement all this complexity if you don’t know what it’s for?

There’s a good chance a single thread (correctly coded with no spurious mutex thrashing) can saturate your network, so: no. Having multiple I/O threads is generally about conveniently partitioning your logic and state (ie, having one client per thread, or different unrelated I/O subsystems in different threads), rather than performance.


If you want to pull packets off a message queue and send them to TCP, the performant way is to:

  1. use a single thread just doing this (your program may have other threads doing other things – avoid synchronizing with them if possible)
  2. open a single persistent TCP connection to the server and don’t connect/close it for every fragment
  3. that’s it. It’s much simpler than what you have and will perform much better.

You can realistically have one thread handling multiple different connections, but I can’t see any way this would be useful in your case, so keep it simple.

User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement