Hello, big bro..
Есть проблема..
Делаю сервер с 3 портами, работающими каждый независимо друг от друга...
Использую неблокируюшие сокеты + select. Возникает следующая проблема - после добавления слушающих сокетов в множество селект прекрасно разбирает входящие соединения и разруливает их, нок примеру если мне необходимо принимать данные от нескольких клиентов (одновременно), после вызова accept только первый клиент посылает данные (на примере эхо-сервера и телнета проверил), а остальные хоть и подключаются, но при вводе строк не получают ответа.. Как только первый клиент отрубается, начинают работать остальные... Как поправить, где пропустил чего?
--
функция принимает 3 порта, функция makeServerSocket принимает порт и возвращает дискриптор слушающего сокета (в ней выполняются последовательно socket, bind, listen).
В этой функции также включен nonblocking mode через fcntl.
Спс :)
P.S. всякие kqueue, epoll и пр. не предлагать плиз взамен этому. Ибо геморрно и разбираться надо :( fork тоже не катит, как и треды...
int startServer(int emulator_port, int controller_port, int system_port)
{
struct timeval timeout;
fd_set master_set;
fd_set working_set;
char buffer[MAXLINE+1];
int end_server=0;
int close_conn;
int rc,desc_ready;
int max_sd,new_sd;
int i,len;
int emu_sock, contr_sock, sys_sock;
FD_ZERO(&master_set);
emu_sock=makeServerSocket(emulator_port);
FD_SET(emu_sock,&master_set);
contr_sock=makeServerSocket(controller_port);
FD_SET(contr_sock,&master_set);
sys_sock=makeServerSocket(system_port);
FD_SET(sys_sock,&master_set);
max_sd=emu_sock+contr_sock+sys_sock;
do
{
memcpy(&working_set,&master_set,sizeof(master_set));
printf("Waiting on select()...\n");
rc=select(max_sd+1,&working_set,NULL,NULL,NULL);
if(rc<0)
{
perror("select() failed");
break;
}
if(rc==0)
{
printf("select() timed out. End program.\n");
break;
}
desc_ready=rc;
for(i=0;i<=max_sd && desc_ready>0;++i)
{
if(FD_ISSET(i,&working_set))
{
desc_ready-=1;
if(i==emu_sock || i==contr_sock || i==sys_sock)
{
do
{
if(i==emu_sock)
{
new_sd=accept(i,NULL,NULL);
if(fcntl(i,F_SETFL,O_NONBLOCK)<0)
{
perror("fcntl failed");
return -1;
}
printf("Emulator socket is readable\n");
}
else if(i==contr_sock)
{
new_sd=accept(i,NULL,NULL);
if(fcntl(i,F_SETFL,O_NONBLOCK)<0)
{
perror("fcntl failed");
return -1;
}
printf("Controller socket is readable\n");
}
else
{
new_sd=accept(i,NULL,NULL);
if(fcntl(i,F_SETFL,O_NONBLOCK)<0)
{
perror("fcntl failed");
return -1;
}
printf("System socket is readable\n");
}
if(new_sd<0)
{
if(errno!=EWOULDBLOCK)
{
perror("accept() failed");
end_server=1;
}
break;
}
printf("New incoming connection - %d\n",new_sd);
FD_SET(new_sd,&master_set);
if(new_sd>max_sd)
max_sd=new_sd;
} while (new_sd!=-1);
}
else
{
printf("Descriptor %d is readable\n",i);
close_conn=0;
do
{
rc=recv(i,buffer,sizeof(buffer),0);
if(rc<0)
{
if(errno!=EWOULDBLOCK)
{
perror("recv() failed");
close_conn=1;
}
break;
}
if(rc==0)
{
printf("Connection closed\n");
close_conn=1;
break;
}
len=rc;
printf("%d bytes received\n",len);
rc=send(i,buffer,len,0);
if(rc<0)
{
perror("send() failed");
close_conn=1;
break;
}
} while (1);
if(close_conn)
{
close(i);
FD_CLR(i,&master_set);
if(i==max_sd)
{
while(FD_ISSET(max_sd,&master_set) == 0)
max_sd-=1;
}
}
}
}
}
} while(!end_server);
for(i=0;i<=max_sd;++i)
{
if(FD_ISSET(i,&master_set))
close(i);
}
return 0;
}
У вас один тред который всё слушает и обрабатывает, значит когда первый клиент начинает слать данные то этот тред занят с этим клиентом и физически не может отвечать на другие запросы, как только с первым клиентом разобрались перешли к следующему в очереди.
По моему всё работает правильно, с одним тредом по другому сделать сложно.
>У вас один тред который всё слушает и обрабатывает, значит когда первый
>клиент начинает слать данные то этот тред занят с этим клиентом
>и физически не может отвечать на другие запросы, как только с
>первым клиентом разобрались перешли к следующему в очереди.
>По моему всё работает правильно, с одним тредом по другому сделать сложно.
>Значит после того, как FD_ISSET просигналил о появлении соединения, нужно создавать отдельный поток и там уже вызывать функцию accept(), правильно я понимаю?
> Значит после того, как FD_ISSET просигналил о появлении соединения, нужно создавать
>отдельный поток и там уже вызывать функцию accept(), правильно я понимаю?Это один из вариантов. Есть как минимум ещё два стандартных пути:
- вместо потоков для обработки данных каждого соединения использовать дочерние процессы
- использовать более совершенные по сравнению с select() механизмы (ну хотя бы вызов poll()) и обеспечить переключение контекста в одном-единственном потоке, с ожиданием активности клиентов и их готовности к приёму данных
>[оверквотинг удален]
>>отдельный поток и там уже вызывать функцию accept(), правильно я понимаю?
>
>Это один из вариантов. Есть как минимум ещё два стандартных пути:
>
> - вместо потоков для обработки данных каждого соединения использовать дочерние
>процессы
>
> - использовать более совершенные по сравнению с select() механизмы (ну
>хотя бы вызов poll()) и обеспечить переключение контекста в одном-единственном потоке,
>с ожиданием активности клиентов и их готовности к приёму данныхИ есть "самый" стандартный вариант :), который, очевидно и был нужен автору: не нужно цикла do-while вокруг recv()+send(), а вместо этого, нужно:
а) создавать для каждого сокета свой буффер и запоминать сколько уже данных записано в каждый буффер (текущая позиция в буфере, начальное значение 0),
б) для каждого сокета готового к чтению, прочитать (recv()) (без цикла - один раз!) столько, сколько сокет отдаст, в буффер этого сокета начиная с текущей позиции, и потом увеличить текущую позицию на количество байт реально прочитанных в этот раз (значение возвращаемое recv()), чтобы в следующий раз дописать в буфер новые данные, а не затереть уже прочитанные;
в) когда все ожидаемые данные прочитаны из конкретного сокета (или по длинне данных, или по какому-то спец. символу, например, NewLine), заносить дескрипторы таких сокетов во второй FD_SET и подавать этот FD_SET в 3-й параметер select() (see man select). Ну и, понятно, проверять в цикле готовность сокетов из этого FD_SET к записи;
г) для сокетов готовых к записи, так же создавать буферы с исходящими данными, с запоминанием текущей позиции для каждого буфера,
д) и точно так же как и при чтении сокетов, для каждого сокета готового к записи, записать (send()) (без цикла - один раз!) столько, сколько сокет возьмет, из его исходящего буфера, начиная с его текущей позиции, и увеличить эту текущую позицию на количество реально в этот раз записанных байт (значение возвращаемое send()), чтобы в следующий раз записать в сокет следующую порцию данных.Конечно, возможны варианты разной степени сложности, но основы работы с select() или poll() и иже с ними именно такие.
Прочитал сам свое сообщение, по-моему вышло не совсем понятно. Короче, цикл должен быть один, вокруг select(), и без вложеных циклов. Когда select() сработает, нужно проверять готовность всех сокетов на чтение/запись/ошибку (из FD_SET'ов которые были поданы как 2-й, 3-й и 4-и параметер select()'у. Для каждого активного сокета, должен быть свой, отдельный, буффер с данными, и запомнена текущая позиция в каждом буфере, которая нужна чтобы точно знать сколько мы уже прочитали из сокета (если мы сейчас читаем из этого сокета) или записали (если мы сейчас пишем в этот сокет). Нужен также некий протокол который определяет когда все ожидаемые данные прочитаны из конкретного сокета (напр. NewLine или фиксированная длинна, или, как в HTTP, длинна данных передается в начале самих данных). Сразу после accept(), сокет находится в режиме чтения, когда (по протоколу) все данные прочитаны, сокет переводится в режим записи. Вот, такое вот дополнение к моему сообщению, может, так яснее.А так как реализовано автором, несмотря на то что сокеты в non-blocking режиме, вложеные циклы фактически реализуют блокировки для каждого сокета который только сказал что у него есть начало входящих данных. И кроме того, так как эти циклы написаны, если сеть решит передать данные в два приема, так что первый recv() прочитает только часть данных (а так вполне может случится, ведь сокет non-blocking, а сети никто не указ), второй recv() затрет в буфере данные прочитанные первым recv()'ом - оба recv()'а пишут с самого начала буфера.
>[оверквотинг удален]
>>отдельный поток и там уже вызывать функцию accept(), правильно я понимаю?
>
>Это один из вариантов. Есть как минимум ещё два стандартных пути:
>
> - вместо потоков для обработки данных каждого соединения использовать дочерние
>процессы
>
> - использовать более совершенные по сравнению с select() механизмы (ну
>хотя бы вызов poll()) и обеспечить переключение контекста в одном-единственном потоке,
>с ожиданием активности клиентов и их готовности к приёму данныхПотоки нужны, ибо планируется обеспечить взаимодействие между клиентами на разных портах, через процессы это вроде тяжело реализовать...
P.S. Может есть у кого примеры серверов с несколькими работающими портами и вызовами типа select\poll\epoll...?
>>[оверквотинг удален]
>Потоки нужны, ибо планируется обеспечить взаимодействие между клиентами на разных портах,
> через процессы это вроде тяжело реализовать...Через процессы можно использовать IPC, но у меня опыта в IPC нету, поэтому тоже кажеться что сложно :-)
>P.S. Может есть у кого примеры серверов с несколькими работающими портами и
>вызовами типа select\poll\epoll...?Примеров у меня нету, но могу посоветовать использовать какую-нибудь библиотеку, котороя умеет делать мультплексирование и прочие низкоуровневые штуки.
для C++ можно посмотреть ACE Framework (http://www.cs.wustl.edu/~schmidt/ACE.html) или POCO C++ (http://pocoproject.org/), обе библиотеки имеют примеры.
для C можно использовать glib (http://library.gnome.org/devel/glib/2.20/), я думаю что найдётся много OpenSource прокетов где можно посмотреть примеры использования glib, например pidgin использует glib
>Потоки нужны, ибо планируется обеспечить взаимодействие между клиентами на разных портах, через процессы это вроде тяжело реализовать...Потоки нужны, если обработка тяжёлая и хочется заюзать несколько процов. Количество потоков определяется количеством процов. А клиентов всех засовывать в select (poll/epoll...) и обрабатывать асинхронно, это никак не помешает никаким взаимодействиям.
Фуф, вроде справился, правда код вырос в 2 раза, но основную цель выполняет - есть 3 открытых порта, к к-ым может прицепиться множество народу на каждый, и будут они работать все вместе..
И вообщем ушел я от селектов...
Сделал на epoll, вроде он побыстрее будет судя по диаграммкам С10К...
Кого заинтересует - могу выложить код, не жалко.. Сам долго гуглил на более-менее нормальные примеры epoll.
>Фуф, вроде справился, правда код вырос в 2 раза, но основную цель
>выполняет - есть 3 открытых порта, к к-ым может прицепиться множество
>народу на каждый, и будут они работать все вместе..
>И вообщем ушел я от селектов...
>Сделал на epoll, вроде он побыстрее будет судя по диаграммкам С10К...
>Кого заинтересует - могу выложить код, не жалко.. Сам долго гуглил на
>более-менее нормальные примеры epoll.Выложи плиз :), интересно стало.
>Выложи плиз :), интересно стало.Держи, не жалко. Думаю, разберешься что куда...
Это не итоговая версия - здесь много сообщений для отслеживания ошибок..int startServer(int emulator_port, int controller_port, int system_port)
{
int emu_sock=0, contr_sock=0, sys_sock=0;
int emu_fd, contr_fd, sys_fd;
int epfd=0;
struct sockaddr_in my_addr,emu_addr,contr_addr,sys_addr;
int res=0,res1=0,res2=0;
int i=0;
int new_fd,kdpfd,ret;
static struct epoll_event ev;
static struct epoll_event events[EPOLL_QUEUE_LEN]; /*When no size, error - Bad address*/
int len=sizeof(struct sockaddr_in);
int curfds,nfds;
int n;
if((emu_sock=makeServerSocket(emulator_port))==-1)
{
perror("error emulator listen");
return -1;
}
if((contr_sock=makeServerSocket(controller_port))==-1)
{
perror("error controller listen");
return -1;
}
if((sys_sock=makeServerSocket(system_port))==-1)
{
perror("error system listen");
return -1;
}
if((epfd=epoll_create(EPOLL_QUEUE_LEN))==-1)
{
perror("epoll_create error");
return -1;
}
ev.events= EPOLLIN | EPOLLET;
ev.data.fd=emu_sock;
if(epoll_ctl(epfd,EPOLL_CTL_ADD,emu_sock,&ev)<0)
{
perror("epoll_ctl error");
return -1;
}
ev.data.fd=contr_sock;
if(epoll_ctl(epfd,EPOLL_CTL_ADD,contr_sock,&ev)<0)
{
perror("epoll_ctl error");
return -1;
}
ev.data.fd=sys_sock;
if(epoll_ctl(epfd,EPOLL_CTL_ADD,sys_sock,&ev)<0)
{
perror("epoll_ctl error");
return -1;
}
curfds=1;
while(1)
{
/*waiting for some operations on socket*/nfds=epoll_wait(epfd,events,curfds,-1);
if(nfds==-1)
{
perror("inside error in epoll_wait");
break;
}
for (n = 0; n < nfds; ++n)
{
if (events[n].data.fd == emu_sock)
{
emu_fd = accept(emu_sock, (struct sockaddr *)&emu_addr,&len);
if (emu_fd < 0)
{
perror("accept from emu_sock");
continue;
}
else
printf("Connection allocated socket is:%d\n", emu_fd);
setNonBlocking(emu_fd);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = emu_fd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, emu_fd, &ev) < 0)
{
perror("inside error in epoll_wait");
fprintf(stderr, "to socket %d to join epoll failed!%s\n",emu_fd, strerror(errno));
return -1;
}
curfds++;
}
else if (events[n].data.fd == contr_sock)
{
contr_fd = accept(contr_sock, (struct sockaddr *)&contr_addr,&len);
if (contr_fd < 0)
{
perror("accept from contr_sock");
continue;
}
else
printf("Connection allocated socket is:%d\n", contr_fd);
setNonBlocking(contr_fd);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = contr_fd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, contr_fd, &ev) < 0)
{
fprintf(stderr, "to socket %d to join epoll failed!%s\n",contr_fd, strerror(errno));
return -1;
}
curfds++;
}
else if (events[n].data.fd == sys_sock)
{
sys_fd= accept(events[n].data.fd, (struct sockaddr *)&sys_addr,&len);
if (sys_fd < 0)
{
perror("accept from sys_sock");
continue;
}
else
printf("Connection allocated socket is:%d\n", sys_fd);
setNonBlocking(sys_fd);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = sys_fd;
if ((epoll_ctl(epfd, EPOLL_CTL_ADD, sys_fd, &ev))==-1)
{
fprintf(stderr, "To socket [%d] to join epoll failed! %s\n",sys_fd, strerror(errno));
return -1;
}
curfds++;
}
else if(events[n].data.fd == emu_fd)
{
printf("Descriptor readable from EMULATOR accept\n");
ret = handle_message_emulator(events[n].data.fd);
if (ret < 1 && errno != 11)
{
epoll_ctl(epfd, EPOLL_CTL_DEL, events[n].data.fd,&ev);
curfds--;
}
}
else if(events[n].data.fd == contr_fd)
{
printf("Descriptor readable from CONTROLLER accept\n");
ret = handle_message_controller(events[n].data.fd);
if (ret < 1 && errno != 11)
{
epoll_ctl(epfd, EPOLL_CTL_DEL, events[n].data.fd,&ev);
curfds--;
}
}
else if(events[n].data.fd == sys_fd)
{
printf("Descriptor readable from SYSTEM accept\n");
ret = handle_message_system(events[n].data.fd);
if (ret < 1 && errno != 11)
{
epoll_ctl(epfd, EPOLL_CTL_DEL, events[n].data.fd,&ev);
curfds--;
}
}
else
{
printf("UNKNOWN readable descriptor\n");
ret = handle_message(events[n].data.fd);
if (ret < 1 && errno != 11)
{
epoll_ctl(epfd, EPOLL_CTL_DEL, events[n].data.fd,&ev);
curfds--;
}
}
}
}
return 0;
}
Здесь handle_message_* - свои обработчики для каждого accept'a
Пример обычного обработчика привожу ниже...int handle_message(int new_fd)
{
char buf[MAXBUF + 1];
int rlen;
bzero(buf, MAXBUF + 1);
rlen = recv(new_fd, buf, MAXBUF, 0);
if (rlen > 0)
printf ("Descriptor [%d] receives the message successfully: %s . Total of %d bytes of data \n", new_fd, buf, rlen);
else {
if (rlen < 0)
printf ("Message reception failed! Error code is %d, error message is %s \n", errno, strerror(errno));
close(new_fd);
return -1;
}
return rlen;
}На здоровье ;)