Changeset 2101

Show
Ignore:
Timestamp:
06/16/10 15:11:16 (20 months ago)
Author:
mwhitworth
Message:

Add to TCP.

Location:
Whitix/branches/netchannel/user/sdk/network/tcp
Files:
11 modified

Legend:

Unmodified
Added
Removed
  • Whitix/branches/netchannel/user/sdk/network/tcp/tcp.c

    r2099 r2101  
    77#include <net/ipv4.h> 
    88#include <net/tcp.h> 
     9#include <limits.h> 
    910#include <errno.h> 
    1011#include <net/aio.h> 
     
    6768        pthread_mutex_init(&tcpSock->sendUpdate, NULL); 
    6869 
     70        /* Stats */ 
     71        tcpSock->stats.minWin = ULONG_MAX; 
     72 
    6973        /* Add to list, used when writing out profiling and statistics data */ 
    7074        if (!tcpHead) 
     
    100104static void TcpRecvAio(struct TcpSocket* tcpSock, char* data, int length) 
    101105{ 
     106        Socket* socket = tcpSock->socket; 
     107         
     108        while (socket->recvAio->flags & ASYNC_ACTIVE) 
     109                SysYield(); 
     110 
     111        _SockEventFireList(socket, socket->recvAio, data, length); 
    102112} 
    103113 
     
    186196        ulong seq = NetToHostLong(header->seqNum); 
    187197        ulong ack = NetToHostLong(header->ackNum); 
    188         ulong dataLen = length - TcpGetHeaderSize(header) - sizeof(struct IpHeader); 
     198        struct IpHeader* ipHeader = (struct IpHeader*)ChanRecvBufferData(tcpSock->channel, buffer); 
     199        ulong dataLen = length - TcpGetHeaderSize(header) - IpGetHeaderSize(ipHeader); 
    189200        ulong seqEnd = seq + dataLen; 
    190201        int err = TCP_ERROR; 
     
    277288        if (dataLen > 0) 
    278289        { 
    279                 ChanRecvBuffAddRead(tcpSock->channel, buffer, sizeof(struct IpHeader) + 
    280                         TcpGetHeaderSize(header)); 
     290                ChanRecvBuffAddRead(tcpSock->channel, buffer, IpGetHeaderSize(ipHeader) + TcpGetHeaderSize(header)); 
    281291 
    282292                tcpSock->stats.totalBytesRecv += dataLen; 
     
    305315                tcpSock->ackMissed++; 
    306316 
    307                 if (tcpSock->ackMissedBytes >= 3*tcpSock->mss || tcpSock->ackMissed > 0) 
     317//              if (TcpInSlowStart(tcpSock) || tcpSock->ackMissedBytes >= 3*tcpSock->mss || tcpSock->ackMissed > 3) 
    308318                { 
    309319                        tcpSock->ackMissed = 0; 
     
    318328                 
    319329                if (tcpSock->socket && tcpSock->socket->recvAio) 
    320                         TcpRecvAio(tcpSock, ChanRecvBufferData(tcpSock->channel, buffer), dataLen); 
     330                        TcpRecvAio(tcpSock, ChanRecvBufferData(tcpSock->channel, buffer) + IpGetHeaderSize(ipHeader) + TcpGetHeaderSize(header), dataLen); 
    321331        } 
    322332 
     
    351361 
    352362dropPacket: 
    353         tcpSock->stats.droppedPackets++; 
    354         tcpSock->stats.droppedBytes += dataLen; 
     363        TcpDropPacket(tcpSock, dataLen); 
    355364        return err; 
    356365} 
     
    695704{ 
    696705        struct TcpSocket* tcpSock = (struct TcpSocket*)(socket->priv); 
     706        struct HostEntry* entry; 
     707 
     708        if ((entry = HostFindEntry(TcpDestAddress(tcpSock), HOST_CLIENT))) 
     709        { 
     710                tcpSock->rto = entry->rto; 
     711                tcpSock->rtt = entry->rtt; 
     712                tcpSock->mdev = entry->mdev; 
     713        }else{ 
     714                tcpSock->rto = TCP_RTO_DEFAULT; 
     715        tcpSock->rtt = TCP_RTT_DEFAULT; 
     716                tcpSock->localThresh = 0x10000; 
     717        } 
    697718 
    698719        /* Window setup */ 
     
    704725        tcpSock->localCwndBytes = tcpSock->mss; 
    705726        tcpSock->prevUpRatio = 3; 
    706         tcpSock->localThresh = 0x10000; 
    707  
    708         /* Retx */ 
    709         tcpSock->rto = TCP_RTO_DEFAULT; 
    710  
    711         TCP_DEBUG(("mss (local) = %u", tcpSock->mss)); 
    712727 
    713728        if (TcpSendSyn(socket) < 0) 
     
    715730 
    716731        TcpSetTimer(tcpSock, TCP_SYN_TIMEOUT, TcpResendSyn); 
    717  
    718732        TcpSocketAdd(socket); 
    719733 
     
    728742 
    729743        /* Set up statistics */ 
     744        tcpSock->stats.destAddr = NetToHostLong(TcpDestAddress(tcpSock)); 
    730745        tcpSock->stats.destPort = TcpDestPort(tcpSock); 
    731746 
     
    738753        ChannelOptions options; 
    739754        struct TcpSocket* tcpSock = TcpInfo(socket); 
     755        struct Ipv4EndPoint* dest = (struct Ipv4EndPoint*)address; 
    740756 
    741757        chanSrc.address = 0; 
    742758        chanSrc.port = 0; /* IP channel code will sort out a port. */ 
    743759 
    744         options.flags = CHAN_KEEP_SEND_BUFFERS; 
     760        if (!dest->port) 
     761                return -1; 
     762 
     763        options.flags = CHAN_KEEP_SEND_BUFFERS | CHAN_RECORD_SEND_TIME; 
    745764        options.sendBuffers = 0; 
    746765        options.recvBuffers = 0; 
     
    769788        chanDest.port = 0; 
    770789 
    771         options.flags = CHAN_KEEP_SEND_BUFFERS; 
     790        options.flags = CHAN_KEEP_SEND_BUFFERS | CHAN_RECORD_SEND_TIME; 
    772791        options.sendBuffers = 0; 
    773792        options.recvBuffers = 0; 
     
    841860        srcIp.port = TcpSourcePort(tcpSock); 
    842861 
    843         options.flags = CHAN_KEEP_SEND_BUFFERS; 
     862        options.flags = CHAN_KEEP_SEND_BUFFERS | CHAN_RECORD_SEND_TIME; 
    844863        options.sendBuffers = 0; 
    845864        options.recvBuffers = 0; 
     
    888907        address->port = destIp.port; 
    889908        address->address = NetToHostLong(destIp.address); 
     909 
     910        childInfo->stats.destAddr = address->address; 
     911        childInfo->stats.destPort = address->port; 
    890912 
    891913        return 0; 
  • Whitix/branches/netchannel/user/sdk/network/tcp/tcp.h

    r2099 r2101  
    88#include <net/ipv4.h> 
    99#include <net/tcp.h> 
     10 
     11#include "../stats/stats.h" 
    1012 
    1113#define CHAN_TYPE_TCP           3 
     
    2022        ulong droppedPackets, droppedBytes; 
    2123        ulong retxBytesSent, retxPacketsSent; 
    22         ulong wrongSumPackets, wrongSumBytes; 
     24        ulong minWin, maxWin; 
    2325 
    2426        /* (Mainly) protocol */ 
     
    6567        struct UserTimer retxTimer; 
    6668        int retxTimerHnd; 
    67         int rto; 
     69        int rto, rtt, mdev; 
    6870 
    6971        /* Sequence numbers. */ 
     
    111113 
    112114        /* Events */ 
    113         struct AsyncCtx* remWinChanged;  
     115        struct AsyncCtx *remWinChanged, *retxTimerOut, *localWinChanged, *droppedPkts;  
    114116}; 
     117 
     118extern int _netDebug; 
     119 
     120#ifndef DEBUG_TCP 
     121#define DEBUG_TCP 0x1 
     122#endif 
    115123 
    116124//#define TCP_DEBUG_Y 
    117125 
    118126#ifdef TCP_DEBUG_Y 
    119 #define TCP_DEBUG(args) do { printf("[TCP]: %d: %s: ", SysGetCurrentThreadId(), __func__); printf args; printf(" (%s:%d)\n", __FILE__, __LINE__); } while (0) 
     127#define TCP_DEBUG(args) do { if (_netDebug & DEBUG_TCP) { printf("[TCP]: %d: %s: ", SysGetCurrentThreadId(), __func__); printf args; printf(" (%s:%d)\n", __FILE__, __LINE__); } } while (0) 
    120128#else 
    121129#define TCP_DEBUG(args) 
     
    132140{ 
    133141        return ((struct Ipv4EndPoint*)(&socket->channel->src))->port; 
     142} 
     143 
     144static inline ulong TcpDestAddress(struct TcpSocket* socket) 
     145{ 
     146        return ((struct Ipv4EndPoint*)(&socket->channel->dest))->address; 
    134147} 
    135148 
     
    156169#define TCP_AGAIN       -2 
    157170 
     171/* Buffer flags */ 
     172#define TCP_FLAG_RETX   0x01 
     173 
    158174#endif 
  • Whitix/branches/netchannel/user/sdk/network/tcp/tcp_congestion.c

    r2099 r2101  
     1#include <net/socket.h> 
     2#include <net/inet_raw.h> 
     3 
    14#include "tcp.h" 
    25#include "seq.h" 
     
    47#include "../ipv4.h" 
    58#include "tcp_congestion.h" 
     9#include "tcp_timer.h" 
    610 
    711int TcpInSlowStart(struct TcpSocket* tcpSock) 
     
    1317{ 
    1418        ChanSendBuffer* curr; 
     19        struct IpHeader* ipHeader; 
     20        int ret; 
    1521 
    1622        pthread_mutex_lock(&tcpSock->retxLock); 
     
    3541                return 0; 
    3642 
    37         TCP_DEBUG(("curr, fast retransmit!")); 
    38         SysExit(0); 
     43        ipHeader = (struct IpHeader*)(ChanSendBufferData(tcpSock->channel, curr)); 
     44        TCP_DEBUG(("Resending length=%u", NetToHostShort(ipHeader->totalLength))); 
     45 
     46        ChanSendBuffSetFlags(curr, TCP_FLAG_RETX); 
     47 
     48        if ((ret = ChanBufferSend(tcpSock->channel, curr, NetToHostShort(ipHeader->totalLength))) < 0) 
     49        { 
     50                TCP_DEBUG(("Could not retransmit in retransmission timeout handler, ret=%d", ret)); 
     51                tcpSock->retx = curr; 
     52        } 
    3953 
    4054out: 
     
    112126void TcpRetxTimeout(void* data) 
    113127{ 
    114         printf("TcpRetxTimeout\n"); 
    115 } 
     128        struct TcpSocket* socket = (struct TcpSocket*)data; 
     129        ChanSendBuffer* retx; 
     130        struct IpHeader* ipHeader; 
     131        int ret; 
     132    ulong oldRto = socket->rto; 
     133 
     134        pthread_mutex_lock(&socket->retxLock); 
     135        if ((retx = socket->retx)) 
     136                socket->retx = ChanSendBuffGetPriv(socket->retx); 
     137        pthread_mutex_unlock(&socket->retxLock); 
     138 
     139        if (retx) 
     140        { 
     141                ipHeader = (struct IpHeader*)(ChanSendBufferData(socket->channel, retx)); 
     142                TCP_DEBUG(("Resending length=%u", NetToHostShort(ipHeader->totalLength))); 
     143 
     144                ChanSendBuffSetFlags(retx, TCP_FLAG_RETX); 
     145 
     146                if ((ret = ChanBufferSend(socket->channel, retx, NetToHostShort(ipHeader->totalLength))) < 0) 
     147                { 
     148                        TCP_DEBUG(("Could not retransmit in retransmission timeout handler, ret=%d", ret)); 
     149                        socket->retx = retx; 
     150                } 
     151        } 
     152 
     153        socket->rto = MIN(socket->rto << 1, TCP_MSL); 
     154        TCP_DEBUG(("rto = %u", socket->rto)); 
     155 
     156        TcpRetxTimerStart(socket); 
     157 
     158        if (socket->socket && socket->retxTimerOut) 
     159    { 
     160        /* Let application know retransmit timed out. Supply the timing information. */ 
     161        struct TcpRetxInfo txInf; 
     162 
     163        txInf.rtoOld = oldRto; 
     164        txInf.rtoNew = socket->rto; 
     165        txInf.rtt = socket->rtt; 
     166        txInf.mdev = socket->mdev; 
     167 
     168                _SockEventFireList(socket->socket, socket->retxTimerOut, &txInf, sizeof(struct TcpRetxInfo)); 
     169   } 
     170} 
     171 
     172void TcpUpdateRtt(struct TcpSocket* tcpSock, ChanSendBuffer* buffer) 
     173{ 
     174        int diff = 0; 
     175        struct Time time, sendTime, result; 
     176 
     177        if (ChanSendBuffGetFlags(buffer) & TCP_FLAG_RETX) 
     178                return; 
     179 
     180        ChanSendBuffGetTime(buffer, &sendTime); 
     181        SysGetTime(&time); 
     182        TimerSub(&time, &sendTime, &result); 
     183        diff = result.seconds*1000; 
     184 
     185        if (diff <= 1000) 
     186                diff = 1000; 
     187 
     188        /* The new RTT estimate is 3/4*old + 1/4*new */ 
     189        diff -= (tcpSock->rtt >> 2); 
     190        tcpSock->rtt += diff; 
     191 
     192        if (diff < 0) 
     193                diff = -diff; 
     194 
     195        diff -= (tcpSock->mdev >> 2); 
     196        tcpSock->mdev += diff; 
     197 
     198        /* Retransmission timeout */ 
     199        tcpSock->rto = MIN(((tcpSock->rtt >> 2) + tcpSock->mdev) >> 1, TCP_MSL); 
     200 
     201        if (tcpSock->rto < 1000) 
     202                tcpSock->rto = 1000; 
     203 
     204        TCP_DEBUG(("rto = %u\n", tcpSock->rto)); 
     205} 
  • Whitix/branches/netchannel/user/sdk/network/tcp/tcp_congestion.h

    r2099 r2101  
    22#define TCP_CONGESTION_H 
    33 
     4#include <net/channels.h> 
    45#include <types.h> 
    56 
     
    1011void TcpCongestion(struct TcpSocket* tcpSock); 
    1112void TcpRetxTimeout(void* data); 
     13void TcpUpdateRtt(struct TcpSocket* tcpSock, ChanRecvBuffer* buffer); 
    1214 
    1315#endif 
  • Whitix/branches/netchannel/user/sdk/network/tcp/tcp_events.c

    r2099 r2101  
    2929int TcpRetxTimerOutRegister(Socket* socket, struct TcpEventCtx* ctx) 
    3030{ 
    31         /* 1. Verify socket is TCP */ 
    32         /* 2. Use internal buffer when posting events. Have general TCP state structure */ 
     31        struct TcpSocket* tcpSock = TcpInfo(socket); 
     32        ctx->ctx.socket = socket; 
    3333 
    34         return 0; 
     34        return SocketAsyncAdd(&tcpSock->retxTimerOut, &ctx->ctx); 
    3535} 
    3636 
    37 int TcpDroppedPktsRegister(Socket* socket, struct TcpEventCtx* ctx, int flags) 
     37int TcpDroppedPktsRegister(Socket* socket, struct TcpEventCtx* ctx) 
    3838{ 
    39         /* 1. Verify socket is TCP */ 
    40         /* 2. flags could be CHECKSUM, DUP, etc. */ 
    41         return 0; 
     39        struct TcpSocket* tcpSock = TcpInfo(socket); 
     40        ctx->ctx.socket = socket; 
     41 
     42        return SocketAsyncAdd(&tcpSock->droppedPkts, &ctx->ctx); 
    4243} 
    4344 
    4445int TcpLocalWinRegister(Socket* socket, struct TcpEventCtx* ctx) 
    4546{ 
    46         return 0; 
     47        struct TcpSocket* tcpSock = TcpInfo(socket); 
     48        ctx->ctx.socket = socket; 
     49 
     50        return SocketAsyncAdd(&tcpSock->localWinChanged, &ctx->ctx); 
    4751} 
    4852 
     
    5458        return SocketAsyncAdd(&tcpSock->remWinChanged, &ctx->ctx); 
    5559} 
    56  
    57 int TcpStateUpdateRegister(Socket* socket, struct TcpEventCtx* ctx, int seconds) 
    58 { 
    59         return 0; 
    60 } 
    61  
    62 int TcpSetTransfer(Socket* socket, int length, int blockSize) 
    63 { 
    64         return 0; 
    65 } 
  • Whitix/branches/netchannel/user/sdk/network/tcp/tcp_input.c

    r2100 r2101  
    2424} 
    2525 
     26void TcpDropPacket(struct TcpSocket* tcp, int length) 
     27{ 
     28        tcp->stats.droppedPackets++; 
     29        tcp->stats.droppedBytes += length; 
     30 
     31        if (tcp->socket && tcp->droppedPkts) 
     32        { 
     33                _SockEventFireList(tcp->socket, tcp->droppedPkts, NULL, 0); 
     34        } 
     35} 
     36 
    2637struct TcpHeader* TcpReadPacket(struct TcpSocket* tcpSock, ChanRecvBuffer** buffer, int* 
    2738                length) 
     
    4556        if (tcpHeader->checkSum != TcpCalcChecksum(ipHeader, tcpHeader, *length)) 
    4657        { 
    47                 tcpSock->stats.wrongSumPackets++; 
    48                 tcpSock->stats.wrongSumBytes += *length - sizeof(struct IpHeader); 
     58                TcpDropPacket(tcpSock, *length - sizeof(struct IpHeader)); 
    4959                goto error; 
    5060        } 
  • Whitix/branches/netchannel/user/sdk/network/tcp/tcp_input.h

    r2100 r2101  
    55                length); 
    66int TcpSocketRecv(Socket* socket, void* buffer, unsigned long length, int flags); 
     7void TcpDropPacket(struct TcpSocket* tcpSock, int length); 
    78 
    89#endif 
  • Whitix/branches/netchannel/user/sdk/network/tcp/tcp_output.c

    r2099 r2101  
    302302        { 
    303303                TcpAddToTxQueue(tcpSock, buffer, length); 
    304         }else{ 
    305 //              if (TcpInSlowStart(tcpSock) || (length + TCP_HEADER_MAX_SIZE + sizeof (struct IpHeader)) >= tcpSock->mss) 
    306 //              { 
    307                         ret = TcpTryToSend(tcpSock, buffer, length); 
    308 //              }else{ 
    309 //                      printf("SMALL\n"); 
    310 //                      SysExit(0); 
    311 //              } 
    312         } 
     304        }else 
     305                ret = TcpTryToSend(tcpSock, buffer, length); 
    313306 
    314307        if (ret == TCP_AGAIN || ret >= 0) 
     
    349342                        tcpSock->bytesInFlight -= (endSeq-seq); 
    350343 
    351 //                      TCP_DEBUG(("%d, %d bytes", tcpSock->pktsInFlight, tcpSock->bytesInFlight)); 
    352  
    353                         if (tcpSock->pktsInFlight == 0) 
    354                         { 
    355                                 TCP_DEBUG(("queue = %#X (%d)\n", tcpSock->tx, tcpSock->stats.totalBytesSent)); 
    356                         } 
    357  
    358344                        tcpSock->dupAckSeq = tcpSock->lastRetx = seq; 
     345 
     346                        /* Update round-trip time estimate */ 
     347                        TcpUpdateRtt(tcpSock, buffer); 
    359348 
    360349                        ChanSendBuffSetPriv(buffer, NULL); 
     
    384373} 
    385374 
     375 
    386376int TcpSendPacket(struct TcpSocket* socket, struct TcpHeader* header, const void* buffer, unsigned long length, int flags, ChanSendBuffer** chanBuff) 
    387377{ 
     
    403393        if (buffer) 
    404394                memcpy(data + sizeof(struct IpHeader) + tcpHeaderSize, buffer, length);  
    405  
    406395        pthread_mutex_lock(&socket->sendUpdate); 
    407396 
     
    424413        pthread_mutex_unlock(&socket->sendUpdate); 
    425414        return (ret < 0) ? ret : length; 
     415 
    426416} 
    427417 
  • Whitix/branches/netchannel/user/sdk/network/tcp/tcp_states.c

    r2099 r2101  
    3434        } 
    3535 
    36         if (states[tcpSock->state] != NULL) 
     36        if (states[tcpSock->state] != NULL /*&& (TcpGetTs() & 1)*/) 
    3737                ret = states[tcpSock->state](tcpSock, header, buffer, length); 
     38 
     39#if 0 
     40        else 
     41                printf("Dropped! (%u, %d)\n", length, tcpSock->state); 
     42#endif 
    3843 
    3944        if (ret == TCP_ERROR) 
     
    6166                ChanRecvBuffFree(tcpSock->channel, buffer); 
    6267 
    63         /* Event handling */ 
     68        /* Event handling and general statistical update */ 
    6469        if (!ret && tcpSock->socket) 
    6570        { 
     
    7075        } 
    7176 
     77        if (tcpSock->remoteWindow < tcpSock->stats.minWin) 
     78                tcpSock->stats.minWin = tcpSock->remoteWindow; 
     79 
     80        if (tcpSock->remoteWindow > tcpSock->stats.maxWin) 
     81                tcpSock->stats.maxWin = tcpSock->remoteWindow; 
     82 
    7283        return 0; 
    7384} 
  • Whitix/branches/netchannel/user/sdk/network/tcp/tcp_timer.c

    r2099 r2101  
    3737void TcpTimeWaitClose(struct TcpSocket* tcpSock) 
    3838{ 
    39         printf("TIME-WAIT: CLOSE\n"); 
    40  
    4139        TcpChangeState(tcpSock, TCP_CLOSED); 
    4240} 
     
    6967        spec.seconds = tcpSock->rto/1000; 
    7068        spec.useconds = tcpSock->rto % 1000; 
    71          
     69 
    7270        SysTimerModify(tcpSock->retxTimerHnd, &spec, NULL); 
    7371} 
  • Whitix/branches/netchannel/user/sdk/network/tcp/tcp_timer.h

    r2100 r2101  
    55#define TCP_MSL         2*60 /* minutes */ 
    66#define TCP_RTO_DEFAULT         3*1000 
     7#define TCP_RTT_DEFAULT         1*1000 
    78 
    89int TcpTimerInit(struct TcpSocket* tcpSock);