送受信スレッド(CSendRecvThread)の送信リングバッファにデータを書くとTCPスタックが
送信可能であれば、無制限にデータをネットワークに送出します。
このような動作では、ネットワークのトラフィックが増大し、輻輳が発生する可能性が
あります。
送信側は、送信帯域を制限してデータを送出する責任があります。
今回はこの機能(送信速度の制限)を実装することにします。
具体的には、適切な間隔を空けてデータを送出するようにします。


パケットサイズを1,000Bytesとして、送信帯域が1Mbpsを超えないようしようとすると
送信帯域:1,000,000 bits/secをByte単位に換算すると125,000 bytes/secです。
1,000Bytesずつ送信するとして1秒間の送信回数を求めると
125,000 /1,000 = 125 回送信/sec
つまり送信間隔(次回パケット送信までの時間)は
1,000 / 125 = 8msec
となるので、1,000Bytes送信後8msec待って送信すれば1Mbpsで送信することになります。
(*)実際にはTCPヘッダ、IPヘッダ、イーサーヘッダなどが追加されるので
 きっちりにはなりませんが、誤差の範囲と考えることにしましょう。

送信帯域の制限をするには、次の2つの方法が考えられます。
1.TCPスタックが送信可能かつ前回の送信時刻から8msec経過してたら送信するという動作。
 これは全体の送信帯域を制限するという考え方です。
2.送信リングバッファが空いているかつ前回書き込みから8msec経過してたら書き込む。
 送信バッファにデータがあり送信可能であれば送信する。
 これは、個別データ(音声と映像など)ごとに送信帯域を制限するという考え方です。
今回は1.でやってみることにしましょう

さて8msecを調べるためのタイマーは何を使うべきでしょう。
Windowsには次のようなタイマーがあります。

タイマの分解能について 関数名 分解能 最大値 変数の型 clock 15[ミリ秒] 2^31[ミリ秒] clock_t(32bits符号付き整数) 前後 (約24.86[日]) GetTickCount 15[ミリ秒] 2^32[ミリ秒] DWORD(32bits符号なし整数) 前後 (約49.71[日]) timeGetTime 1[ミリ秒] 2^32[ミリ秒] DWORD(32bits符号なし整数) (約49.71[日])

使えそうなのはtimeGetTimeですね。今回は、Windowsではこれを使うことにします。
(*)他にも使えそうなのはPerformanceCounter、Waitable Timerがあります。
(*)timeGetTimeを使うにはライブラリWinmm.libが必要です。
 stdThread.hに使用することを記述してあります。

(*)LinuxにはtimeGetTimeはありませんが、stdThread.cppにgettimeofdayを使って
 timeGetTimeを作ってあります。
 これも分解能1msecで動作します。
以下に分解能を調べるテストプログラムを載せておきます。参考にしてください。

(*)LinuxにはtimeGetTimeはありませんが、stdThread.cppにgettimeofdayを使って
 timeGetTimeを作ってあります。
 これも分解能1msecで動作します。
以下に分解能を調べるテストプログラムを載せておきます(TestTimerWinTestTimerLinux)。
参考にしてください。
(*)WindowsではtimeGetTimeをコールする前後で分解能の設定と解除を行うようにします。
 timeBeginPeriod(1);
 timeGetTime();
 timeEndPeriod(1);

送信帯域の制御をSimpleClientTransFile for Winへの組み込むことにしましょう
SendRecvThread.h送信した時刻次回送信するまでの間隔を格納するメンバ変数を
宣言します。
次回送信するまでの間隔の計算送信時刻になっているかの検査を行うメソッド関数を
宣言します。
(*)1Mbpsを超える速度で送信する場合は、一度に送信するサイズを大きくします。
IPv6ではこのとき、パケットの分割と結合はエンドポイントで行われます。

【SendRecvThread.h】 #pragma once #include “ThreadJob.h” #include “define.h” // RecvMessagePacketの引数のため class CMySyncObject; // このクラスの使用することを宣言 class CRingBuff; // CRingBuffを使用するため #define SEND_BUFF_SIZE (1024 * 64) // 送信リングバッファのサイズ #if 1 #define SENDBUFSIZE (1024 + sizeof(HeaderRec)) // 一度に送信するサイズの最大値(共通ヘッダ分を加えてあります) #else #define SENDBUFSIZE (1024 * 4) // ★TCPで送信速度を1Mbpsより上げたいときは断片化処理をTCPのスタックに任せる // スタック(send)に渡すサイズを増やす #endif #define RECV_BUFF_SIZE (1024 * 64) // 受信リングバッファのサイズ #define RCVBUFSIZE (1024 * 2) // 一度に読む最大受信サイズ typedef struct { SOCKET fdClient; // 接続済みソケット(connectの結果) CMySyncObject *pCMySyncObject; // 同期オブジェクト } ConnectionInfoRec; class CSendRecvThread : public CThreadJob { public: CSendRecvThread(ConnectionInfoRec *pConInfo); // パラメータをコンストラクタで渡す ~CSendRecvThread(); // 基底クラスの関数をオーバーライドする // C++11で明示的にoverrideを書くことが出来るようになりました // 基底クラスの当該関数にvirtualが書いていないとエラーを出してくれます UINT DoWork() override; // DoSendで実施している内容を記述 BOOL SetSendData(char *pcData, int iSize); // 送信データの設定 BOOL IsZombie(); // このスレッドはゾンビ状態か private: ConnectionInfoRec *m_pConInfo; // コンストラクタで渡されるパラメータを格納 // このスレッド実行中領域が確保されていること BOOL m_fIamZombie; // ゾンビ状態かどうかを保持 CRingBuff *m_pCRingBuffSend; // 送信データ格納用リングバッファ CRingBuff *m_pCRingBuffRecv; // 受信データ格納用リングバッファ DWORD m_dwPrevSentTime; // ★前回送信した時刻(msec) DWORD m_dwSendInterval; // ★次回送信までの間隔(msec) int GetSendData(char **ppcData); // 送信データの取得 int AnalyzeDataRecv(); // 引数と返値を変更 int RecvMessagePacket(HeaderRec *pHeader); // 引数と返値を変更 DWORD CalcNextSendInterval(int iSentSize); // ★次回送信までの間隔 BOOL CanSendNow(DWORD dwNow); // ★送信して良い時刻になったか };

SendRecvThread.cppのDoWorkで送信チェックをするかどうかの判断に送信時刻になったか
どうかのチェックを追加します。

【SendRecvThread.cpp】 #include “SendRecvThread.h” #include “MySyncObject.h” // CMySyncObjectを使うため #include “RingBuff.h” // CRingBuffを使うため // 送信速度 #define SEND_BPS (1000000.0) // ★送信速度1Mbps //———————————————- // function // コンストラクタ // parameter // ConnectionInfoRec *pConInfo [in]機能に必要な情報 // return // なし //———————————————- CSendRecvThread::CSendRecvThread(ConnectionInfoRec *pConInfo) { m_pConInfo = pConInfo; m_fIamZombie = FALSE; m_pCRingBuffSend = new CRingBuff(SEND_BUFF_SIZE); // 送信リングバッファの構築 m_pCRingBuffRecv = new CRingBuff(RECV_BUFF_SIZE); // 受信リングバッファの構築 m_dwPrevSentTime = 0; // ★初回送信はすぐに送信するように m_dwSendInterval = 0; // ★ } CSendRecvThread::~CSendRecvThread() 変更なし //———————————————- // function // 機能を記述した関数(DoSend, DoRecvの内容を記述する) // parameter // なし // return // 0:正常 -1:エラー発生 //———————————————- UINT CSendRecvThread::DoWork() { BOOL fRet = TRUE; fd_set wfds, rfds; struct timeval tv; char *pcData = NULL; // 未送信データ int iSendSize = 0; // 未送信データサイズ char szRecvBuffer[RCVBUFSIZE]; // 受信データを一時的に格納 int iRecvSize; DWORD dwNow; // ★送信チェックした時刻を覚えるため while (!m_fStopFlag) { tv.tv_sec = 0; tv.tv_usec = 10 * 1000; // 10msec FD_ZERO(&rfds); FD_ZERO(&wfds); // 受信データ検査用fd_setは常にセットする FD_SET(m_pConInfo->fdClient, &rfds); // 送信データがあるかリングバッファを調べる iSendSize = m_pCRingBuffSend->GetReadableSize(); // ★チェックのために現在時刻を取得する timeBeginPeriod(1); // タイマーの最小精度を1msecにする dwNow = timeGetTime(); timeEndPeriod(1); // タイマーの最小精度を戻す // ★送信可能時刻かつ未送信データがあるときだけ送信可能検査用fd_setにセットする if ((iSendSize > 0) && (CanSendNow(dwNow) == TRUE)) FD_SET(m_pConInfo->fdClient, &wfds); select(FD_SETSIZE, &rfds, &wfds, NULL, &tv); // タイムアウトまでSleepと同等 // 受信処理(DoRecv) if (FD_ISSET(m_pConInfo->fdClient, &rfds)) // 受信データがあればrecv実施 { // 受信リングバッファに空きがあれば取得する // ここで受信しなければ、次回のFD_ISSETで受信データありがセットされるので // 次回に取得することができる iRecvSize = min(m_pCRingBuffRecv->GetWriteableSize(), RCVBUFSIZE); if (iRecvSize > 0) { if ((iRecvSize = recv(m_pConInfo->fdClient, szRecvBuffer, iRecvSize, 0)) <= 0) { m_pConInfo->pCMySyncObject->Lock(); if (iRecvSize == 0) DispErrorMsg(“Disconnected recv”); else DispErrorMsg(“Err:recv”); m_pConInfo->pCMySyncObject->UnLock(); fRet = FALSE; break; } else { // 取得したデータすぐに受信リングバッファに書きこむ // 受信リングバッファに書き込むのはこのスレッドだけなので、すべて書き込めるはず m_pCRingBuffRecv->Write((LPBYTE)szRecvBuffer, iRecvSize); } } } // 受信リングバッファに格納されているデータの解析を行う // 複数のパケットが格納されている可能性があるので、FD_ISSETの結果とは無関係に // 解析を行うようにする if (AnalyzeDataRecv() == -1) // 返値が-1の時がエラー { m_pConInfo->pCMySyncObject->Lock(); DispErrorMsg(“Err:Packet format”); m_pConInfo->pCMySyncObject->UnLock(); fRet = FALSE; break; } // 送信処理(DoSend) if (FD_ISSET(m_pConInfo->fdClient, &wfds)) // 送信可能ならsend実施 { // 送信したいデータの取得(PATH_MTUより小さくなるように取得する) iSendSize = GetSendData(&pcData); if (send(m_pConInfo->fdClient, pcData, iSendSize, 0) != iSendSize) { DispErrorMsg(“Err:send\n”); fRet = FALSE; break; } SAFE_FREE(pcData) // ★送信時刻と次回送信までの時間をセット m_dwPrevSentTime = dwNow; m_dwSendInterval = CalcNextSendInterval(iSendSize); } } m_pConInfo->pCMySyncObject->Lock(); m_fIamZombie = TRUE; m_pConInfo->pCMySyncObject->UnLock(); SAFE_FREE(pcData) return((fRet == TRUE) ? 0 : -1); } BOOL CSendRecvThread::SetSendData(char *pcData, int iSize) 変更なし int CSendRecvThread::GetSendData(char **ppcData) 変更なし BOOL CSendRecvThread::IsZombie() 変更なし int CSendRecvThread::AnalyzeDataRecv() 変更なし int CSendRecvThread::RecvMessagePacket(HeaderRec *pHeader) 変更なし //———————————————- // function // ★次の送信までの間隔(msec)を求める // parameter // int iSentSize [in]送信したサイズ // return // msec //———————————————- DWORD CSendRecvThread::CalcNextSendInterval(int iSentSize) { DWORD dwInterval = 0; double dbBytePerSec = SEND_BPS / 8.0; double dbCountPerSec; if (iSentSize == 0) goto L_END; dbCountPerSec = dbBytePerSec / (double)iSentSize; dwInterval = DWORD(1000.0 / dbCountPerSec); L_END: return(dwInterval); } //———————————————- // function // ★今送信可能時刻か // parameter // DWORD dwNow [in]現在時刻 // return // TRUE/FALSE //———————————————- BOOL CSendRecvThread::CanSendNow(DWORD dwNow) { BOOL fRet = FALSE; if (GetdwInterval(dwNow, m_dwPrevSentTime) >= m_dwSendInterval) fRet = TRUE; return(fRet); }

Linux版クライアント、サーバにも同様に送信制御を組み込むことができます。
送信帯域の制御を組み込んだSimpleClientTransFile for Win
送信帯域の制御を組み込んだSimpleClientTransFile for Linux
送信帯域の制御を組み込んだSimpleServerTransFile for Win
送信帯域の制御を組み込んだSimpleServerTransFile for Linux