2020年8月30日 星期日

簡單的 c++ 使用 multithread 的 localsocket


 共3個檔案: localSocket.h,   client.cpp,   server.cpp:
標頭檔 // localSocket.h
    #include <stdio.h>
    #include <string.h>
    #include <stddef.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <sys/un.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #ifndef  _default__port
    #define  _default__port  8080
    int aTcpServer (int port = _default__port, char *host=NULL, bool client = false) {
        int localSocket = socket(PF_INET, SOCK_STREAM, 0);
        if (localSocket >= 0){
            struct sockaddr_in endpoint;
            memset(&endpoint, 0, sizeof(endpoint));
            socklen_t aLen = sizeof(endpoint);
            endpoint.sin_family = AF_INET;
            endpoint.sin_port   = htons(port);
            endpoint.sin_addr.s_addr = host ?
                inet_addr(host) : 
                inet_addr("127.0.0.1");
            int result = 1;
            setsockopt(localSocket, SOL_SOCKET, SO_REUSEADDR, &result, sizeof(result));
            result = client ?
                connect(localSocket, (struct sockaddr *) &endpoint, aLen)         :
                bind(localSocket, (struct sockaddr *) &endpoint, aLen) < 0 ? -1 :
                listen(localSocket, 10) ;      
            if (result >= 0)  return localSocket;// sucess
            printf("connect result: %d\n", result);
            close(localSocket);     
        }
        printf("%s:%d socket fail!\n", host, port);
        exit(1);
    };   
    int aTcpSocket (char *host=NULL, int port = _default__port) {
        return aTcpServer(port, host, true);
    };   
    int aLocalServer (char *host = NULL, bool client = false) {
        const char *aName = host ? host : "com.example.testLocalSocket";
        int localSocket = socket(PF_UNIX, SOCK_STREAM, 0);
        if (localSocket >= 0) {
            struct sockaddr_un endpoint;
            memset(&endpoint, 0, sizeof(endpoint));
            socklen_t aLen = 1 + strlen(aName) + offsetof(struct sockaddr_un, sun_path);
            endpoint.sun_family  = AF_UNIX;
            endpoint.sun_path[0] = '\0';
            strcpy(endpoint.sun_path + 1, aName);
            int result = client ?
                connect(localSocket, (struct sockaddr *) &endpoint, aLen)         :
                bind(localSocket, (struct sockaddr *) &endpoint, aLen) < 0 ? -1 :
                listen(localSocket, 10); 
            if (result >= 0) return localSocket;// sucess
            printf("connect result: %d, fail!\n", result);
            close(localSocket);
        }
        printf("%s socket fail!\n", aName);
        exit(1);
    };   
    int aLocalSocket (char *host = NULL) {
        return aLocalServer(host, true);
    };
    #endif

客戶端: // client.cpp
    #include <random>
    #include "localSocket.h"
    using namespace std;
    int main( ) {
        random_device devSeed;
        static auto mt19937Gen  = mt19937(devSeed( ));
        static auto uniformP100 = uniform_int_distribution<int>(0, 99);
        auto randomNumberString = [ ]( ) {
            return to_string(uniformP100(mt19937Gen)).c_str( );
        };
        int localSocket = aLocalSocket( );
        const char *msg = randomNumberString( );
        write(localSocket, msg, strlen(msg));
        close(localSocket);
    }


伺服端 // server.cpp
    // g++  server.cpp  -pthread  -o  server
    #include <future>
    #include <thread>
    #include "localSocket.h"
    using namespace std;
    int main( ) {
        int localSocket = aLocalServer( );
        volatile static int socketNumber = 0;
        static int sockets[10];
        static future<long> futures[10];
        static FILE *fin = fopen("./localSocket.h","rb");
        static pthread_mutex_t f1mutext = PTHREAD_MUTEX_INITIALIZER;
        static pthread_mutex_t t1mutext = PTHREAD_MUTEX_INITIALIZER;
        while (true) { // Main thread
            int socketfd = accept(localSocket, NULL, NULL);
            if(socketfd < 0) continue;
            if(socketNumber == 10) { close(socketfd); continue; }      
            pthread_mutex_lock(&t1mutext);
            auto aPromise = promise<long>( );
            futures[socketNumber] = aPromise.get_future( );
            sockets[socketNumber] = socketfd;
            socketNumber ++;
            auto await = [socketfd]( ) {
                pthread_mutex_lock(&t1mutext);
                socketNumber --;
                for(int i = 0; i < socketNumber; i ++) if(sockets[i] == socketfd) {
                    swap(sockets[i] , sockets[socketNumber]);
                    swap(futures[i] , futures[socketNumber]);
                    break;
                }
                long value = futures[socketNumber].get( );
                pthread_mutex_unlock(&t1mutext);
                return value;
            };
  
            auto aThread = [socketfd, await](int buflen) {
                long limit = await( );
                long position = 0;
                char *buffer = (char *) malloc(buflen + 2);
                if (buffer == nullptr) {
                    close(socketfd);
                    return;
                }
                auto sprintfd = [socketfd] (char *buf, const char *fmt, long arg) {
                    sprintf(buf, fmt, arg);
                    write(socketfd, buf, strlen(buf));
                };
                int len = 0;
                fseek(fin, 0L, SEEK_END);
                long fileLength = ftell(fin);
                do { // read command from network
                    len = read(socketfd, buffer, buflen);
                    buffer[len] = 0;// append EOS
                    if(len > 0) printf("%s\n", buffer);          
                    if (strstr(buffer, "ftell=?")) {
                        sprintfd(buffer, "ftell=%ld", position);
                    } else if (strstr(buffer, "length=?")) {                          
                        sprintfd(buffer, "length=%ld", limit);
                    } else if (strstr(buffer, "fseek=")) {
                        sscanf(buffer, "fseek=%ld", &position);
                        if (position < 0) position = limit - 1;
                        sprintfd(buffer, "ftell=$ld", position);
                    } else if(strstr(buffer, "fread=")) {
                        sscanf(buffer, "fread=%d", &len);// re-use len
                        if(len > 0) {
                            pthread_mutex_lock(&f1mutext);
                            fseek(fin, position, SEEK_SET);
                            len = fread(buffer, 1, len, fin);
                            pthread_mutex_unlock(&f1mutext);
                            if (len > 0) {
                                write(socketfd, buffer, len);
                                position += len;
                            }
                        }
                    }
                } while (len > 0);// EOF
                free(buffer);              
                close(socketfd);
            };
            pthread_mutex_unlock(&t1mutext);
            aPromise.set_value(fileLength);
            thread(aThread, 1024 * 1024).detach( );// thread split
        }
        fclose(fin);
        close(localSocket);
    }

編譯並執行看看:
g++   server.cpp  -pthread  -o  server  &&  ./server &
g++   client.cpp   -o   client
./client    |    ./client   |   ./client   |   ./client   |    ./client   |    ./client   |    ./client   |    ./client

沒有留言:

張貼留言

使用 pcie 轉接器連接 nvme SSD

之前 AM4 主機板使用 pcie ssd, 但主機板故障了沒辦法上網, 只好翻出以前買的 FM2 舊主機板, 想辦法讓老主機復活, 但舊主機板沒有 nvme 的界面, 因此上網買了 pcie 轉接器用來連接 nvme ssd, 遺憾的是 grub2 bootloader 無法識...