SimpleClientTransFile(Windows)の各バッファをリングバッファに置き換えます。
手順は以下の通りです。


1.送信用リングバッファへの対応
1.1.送信バッファ(m_pSendData)をリングバッファm_pCRingBuffSendに変更します。
1.2.送信データのセットSetSendData(char *pcData, int iSize)内でリングバッファへの
  書き込みWrite(LPBYTE pbBuf, long lSize)を使用するように変更します。
1.3 送信データ送信データの取得GetSendData(char **ppcData)内でリングバッファからの
  読み込みRead(LPBYTE pbBuf, long lSize)を使用するように変更します。

2.受信用リングバッファへの対応
2.1.受信データ格納用リングバッファm_pCRingBuffRecvを追加します。
2.2 受信用リングバッファに空きがあるときに受信処理を行いリングバッファに格納します。
  受信データが受信バッファ(szRecvBuffer)に格納されたら、すぐにリングバッファに
  書きこみます。
2.3 受信データの解析は、受信リングバッファに格納されているデータに対して実施します。
  AnalyzeDataRecvでは格納されているサイズを調べてヘッダ分あれば、仮読みしてコマンド
  を調べRecvMessagePacketに渡します。
  RecvMessagePacketでは、データサイズを調べサイズ分受信をしていたら読み込み処理を
  行います。
(*)AnalyzeDataRecv、RecvMessagePacketの返値を次のように変更します。
 0:パケットが揃っていない
 1:パケットを受信したので処理を行った
 -1:エラーが発生した

前回作成したリングバッファのクラス
CRingBuff(RingBuff.h、RingBuff.cpp)を
プロジェクトに追加します。

SendRecvThread.hを見てみましょう。
変更箇所に★と変更項目番号を記述しました。

【SendRecvTread.h】 #pragma once #include “ThreadJob.h” #include “define.h” // ★2.3.RecvMessagePacketの引数のため class CMySyncObject; // このクラスの使用することを宣言 class CRingBuff; // ★CRingBuffを使用するため #define SEND_BUFF_SIZE (1024 * 64) // ★1.1.送信リングバッファのサイズ #define SENDBUFSIZE (1024 + sizeof(HeaderRec)) // ★1.1.一度に送信するサイズの最大値(共通ヘッダ分を加えてあります) #define RECV_BUFF_SIZE (1024 * 64) // ★1.2.受信リングバッファのサイズ #define RCVBUFSIZE (1024 * 2) // ★1.2.一度に読む最大受信サイズ typedef struct { SOCKET fdClient; // 接続済みソケット(connectの結果) CMySyncObject *pCMySyncObject; // 同期オブジェクト } ConnectionInfoRec; class CSendRecvThread : public CThreadJob { public: CSendRecvThread(ConnectionInfoRec *pConInfo); // パラメータをコンストラクタで渡す ~CSendRecvThread(); // 基底クラスの関数をオーバーライドする // C++11で明示的にoverrideを書くことが出来るようになりました // 基底クラスの当該関数にvirtualが書いていないとエラーを出してくれます UINT DoWork() override; // DoSendで実施している内容を記述 BOOL SetSendData(char *pcData, int iSize); // 送信データの設定 BOOL IsZombie(); // このスレッドはゾンビ状態か private: ConnectionInfoRec *m_pConInfo; // コンストラクタで渡されるパラメータを格納 // このスレッド実行中領域が確保されていること BOOL m_fIamZombie; // ゾンビ状態かどうかを保持 CRingBuff *m_pCRingBuffSend; // ★1.1.送信データ格納用リングバッファ CRingBuff *m_pCRingBuffRecv; // ★2.1.受信データ格納用リングバッファ int GetSendData(char **ppcData); // 送信データの取得 int AnalyzeDataRecv(); // ★2.3.引数と返値を変更 int RecvMessagePacket(HeaderRec *pHeader); // ★2.3.引数と返値を変更 };

SendRecvThread.cppを見てみましょう。
変更箇所に★と変更項目番号を記述しました。

【SendRecvTread.h】 #include “SendRecvThread.h” #include “MySyncObject.h” // CMySyncObjectを使うため #include “RingBuff.h” // ★CRingBuffを使うため //============================================== // function // コンストラクタ // parameter // ConnectionInfoRec *pConInfo [in]機能に必要な情報 // return // なし //============================================== CSendRecvThread::CSendRecvThread(ConnectionInfoRec *pConInfo) { m_pConInfo = pConInfo; m_fIamZombie = FALSE; m_pCRingBuffSend = new CRingBuff(SEND_BUFF_SIZE); // ★1.1.送信リングバッファの構築 m_pCRingBuffRecv = new CRingBuff(RECV_BUFF_SIZE); // ★2.1.受信リングバッファの構築 } //============================================== // function // デストラクタ // parameter // なし // return // なし //============================================== CSendRecvThread::~CSendRecvThread() { SAFE_DELETE(m_pCRingBuffSend) // ★1.1.送信リングバッファの破棄 SAFE_DELETE(m_pCRingBuffRecv) // ★2.1.受信リングバッファの破棄 } //============================================== // function // 機能を記述した関数(DoSend, DoRecvの内容を記述する) // parameter // なし // return // 0:正常 -1:エラー発生 //============================================== UINT CSendRecvThread::DoWork() { BOOL fRet = TRUE; fd_set wfds, rfds; struct timeval tv; char *pcData = NULL; // 未送信データ int iSendSize = 0; // 未送信データサイズ char szRecvBuffer[RCVBUFSIZE]; // ★2.2.受信データを一時的に格納 int iRecvSize; while (!m_fStopFlag) { tv.tv_sec = 0; tv.tv_usec = 10 * 1000; // 10msec FD_ZERO(&rfds); FD_ZERO(&wfds); // 受信データ検査用fd_setは常にセットする FD_SET(m_pConInfo->fdClient, &rfds); // 未送信データがあるときだけ送信可能検査用fd_setにセットする iSendSize = m_pCRingBuffSend->GetReadableSize(); // ★1.3.送信データがあるかリングバッファを調べる if (iSendSize > 0) FD_SET(m_pConInfo->fdClient, &wfds); select(FD_SETSIZE, &rfds, &wfds, NULL, &tv); // タイムアウトまでSleepと同等 // 受信処理(DoRecv) if (FD_ISSET(m_pConInfo->fdClient, &rfds)) // 受信データがあればrecv実施 { // ★2.2.受信リングバッファに空きがあれば取得する // ここで受信しなければ、次回のFD_ISSETで受信データありがセットされるので // 次回に取得することができる iRecvSize = min(m_pCRingBuffRecv->GetWriteableSize(), RCVBUFSIZE); if (iRecvSize > 0) { if ((iRecvSize = recv(m_pConInfo->fdClient, szRecvBuffer, iRecvSize, 0)) <= 0) { m_pConInfo->pCMySyncObject->Lock(); if (iRecvSize == 0) DispErrorMsg(“Disconnected recv”); else DispErrorMsg(“Err:recv”); m_pConInfo->pCMySyncObject->UnLock(); fRet = FALSE; break; } else { // ★2.2.取得したデータすぐに受信リングバッファに書きこむ // 受信リングバッファに書き込むのはこのスレッドだけなので、すべて書き込めるはず m_pCRingBuffRecv->Write((LPBYTE)szRecvBuffer, iRecvSize); } } } // ★2.3.受信リングバッファに格納されているデータの解析を行う // 複数のパケットが格納されている可能性があるので、FD_ISSETの結果とは無関係に // 解析を行うようにする if (AnalyzeDataRecv() == -1) // ★2.3.返値が-1の時がエラー { m_pConInfo->pCMySyncObject->Lock(); DispErrorMsg(“Err:Packet format”); m_pConInfo->pCMySyncObject->UnLock(); fRet = FALSE; break; } // 送信処理(DoSend) if (FD_ISSET(m_pConInfo->fdClient, &wfds)) // 送信可能ならsend実施 { // ★1.3.送信したいデータの取得(PATH_MTUより小さくなるように取得する) iSendSize = GetSendData(&pcData); if (send(m_pConInfo->fdClient, pcData, iSendSize, 0) != iSendSize) { DispErrorMsg(“Err:send”); fRet = FALSE; break; } SAFE_FREE(pcData) } } m_pConInfo->pCMySyncObject->Lock(); m_fIamZombie = TRUE; m_pConInfo->pCMySyncObject->UnLock(); SAFE_FREE(pcData) return((fRet == TRUE) ? 0 : -1); } //============================================== // function // 送信データの設定 // parameter // char *pcData [in]送信データ // int iSize [in]データ長 // return // TRUE/FALSE //============================================== BOOL CSendRecvThread::SetSendData(char *pcData, int iSize) { BOOL fRet = FALSE; // ★1.2.送信リングバッファに空きがないときは書き込まない fRet = m_pCRingBuffSend->Write((LPBYTE)pcData, iSize); return(fRet); } //============================================== // function // 送信データの取得 // parameter // char **ppcData [in/out]送信データ // return // データ長 //============================================== int CSendRecvThread::GetSendData(char **ppcData) { int iSize = 0; // ★1.3.送信データがあるかリングバッファのデータサイズを調べる if ((iSize = m_pCRingBuffSend->GetReadableSize()) > 0) { // 送信サイズをPATH_MTUより小さくしておく(1024+共通ヘッダなら大丈夫) iSize = min(iSize, SENDBUFSIZE); *ppcData = (char *)calloc(iSize, sizeof(char)); iSize = m_pCRingBuffSend->Read((LPBYTE)*ppcData, iSize); } return(iSize); } //============================================== // function // このスレッドはゾンビ状態か // 別スレッドから参照される // parameter // なし // return // 0:正常 -1:エラー発生 //============================================== BOOL CSendRecvThread::IsZombie() { BOOL fRet; m_pConInfo->pCMySyncObject->Lock(); fRet = m_fIamZombie; m_pConInfo->pCMySyncObject->UnLock(); return(fRet); } //============================================== // function // 受信データの解析 // 受信リングバッファに格納されているデータを調べる // parameter // なし // retun ★2.3. // 0:パケットが揃っていない // 1:パケットを受信したので処理を行った // -1:エラーが発生した //============================================== int CSendRecvThread::AnalyzeDataRecv() { int iRet = 0; HeaderRec Header; WORD wCmd; // ★2.3.データサイズを調べる int iSize = m_pCRingBuffRecv->GetReadableSize(); if (iSize < sizeof(HeaderRec)) // ★2.3.ヘッダサイズに満たないときは何もしない goto L_END; // ★2.3.ヘッダ部を借り読み込みする m_pCRingBuffRecv->ReadWithoutUpdateHeadPoint((LPBYTE)&Header, sizeof(HeaderRec)); // ヘッダ部の解析 if (memcmp(Header.bMagicData, MAGIC_STRING, strlen(MAGIC_STRING)) != 0) { iRet = -1; // 識別子が違うのでエラー goto L_END; } wCmd = ntohs(Header.wCommand); fprintf(stderr, “CMD:%d\n”, wCmd); switch (wCmd) { case CMD_MSG_DATA: iRet = RecvMessagePacket(&Header); break; default: // 知らないコマンドなのでエラー iRet = -1; break; } L_END: return(iRet); } //============================================== // function // メッセージコマンドの受信 // parameter // HeaderRec *pHeader [in]仮読みしたヘッダ // retun ★2.3. // 0:パケットが揃っていない // 1:パケットを受信したので処理を行った // -1:エラーが発生した //============================================== int CSendRecvThread::RecvMessagePacket(HeaderRec *pHeader) { int iRet = 0; MsgDataRec *pMsgData = NULL; int iMsgSize, iSize; char *pszMsg = NULL; LPBYTE pbDest = NULL; iMsgSize = ntohs(pHeader->wDataLen); // ★2.3.データが足りないときは何もしない iSize = m_pCRingBuffRecv->GetReadableSize(); if (iSize < iMsgSize + sizeof(HeaderRec)) goto L_END; // ★2.3.パケット全体を受信しているので読み込みを実施する pMsgData = (MsgDataRec *)calloc(iMsgSize + sizeof(HeaderRec), sizeof(BYTE)); m_pCRingBuffRecv->Read((LPBYTE)pMsgData, iMsgSize + sizeof(HeaderRec)); // NULLターミネート分を追加して確保 pszMsg = (char *)calloc(iMsgSize + 1, sizeof(char)); memcpy(pszMsg, pMsgData->bMsgData, iMsgSize); // UTF-8で受信したので、SJISに変換して表示する ConvUtf8toSJis((LPBYTE)pszMsg, NULL, &iSize); pbDest = (LPBYTE)calloc(iSize, sizeof(BYTE)); ConvUtf8toSJis((LPBYTE)pszMsg, pbDest, &iSize); m_pConInfo->pCMySyncObject->Lock(); fprintf(stderr, “Msg:recv %s\n”, pbDest); m_pConInfo->pCMySyncObject->UnLock(); SAFE_FREE(pMsgData) SAFE_FREE(pbDest) SAFE_FREE(pszMsg) iRet = 1; L_END: return(iRet); }

これで、リングバッファに対応したSimpleClientTransFile(Windows)
完成しました。
Linux版への対応も同様にできます(SimpleClientTransFile(Linux))。
次回はSimpleServerTransFile(Linux)をリングバッファ対応にします。