boost async tcp client
J'ai juste commencé à travailler avec boost.
Je suis de l'écriture TCP client-serveur avec async sockets.
La tâche est la suivante:
- Client envoie au serveur un certain nombre
- Client peut envoyer un autre nubmer avant de recevoir la réponse du serveur.
- Serveur reçoit un numéro, faire un peu de calcul et renvoie le résultat au client.
- Plusieurs clients peuvent être connectés à un serveur.
Fonctionne désormais la suivante
- envoyer un numéro de client pour rompre
- serveur reçoit un certain nombre de thread en cours et calcule droit dans le OnReceive gestionnaire (je sais c'est mal...mais comment je dois commencer un nouveau thread pour faire le calcul en parallèle)
- serveur envoie la réponse en arrière, mais le client déjà déconnecté
Comment peut autoriser le client à saisir des nombres au clavier et à attendre une réponse du serveur en même temps?
Et pourquoi est-ce que mon client de ne pas attendre la réponse de rompre?
Le code de client:
using boost::asio::ip::tcp;
class TCPClient
{
public:
TCPClient(boost::asio::io_service& IO_Service, tcp::resolver::iterator EndPointIter);
void Close();
private:
boost::asio::io_service& m_IOService;
tcp::socket m_Socket;
string m_SendBuffer;
static const size_t m_BufLen = 100;
char m_RecieveBuffer[m_BufLen*2];
void OnConnect(const boost::system::error_code& ErrorCode, tcp::resolver::iterator EndPointIter);
void OnReceive(const boost::system::error_code& ErrorCode);
void OnSend(const boost::system::error_code& ErrorCode);
void DoClose();
};
TCPClient::TCPClient(boost::asio::io_service& IO_Service, tcp::resolver::iterator EndPointIter)
: m_IOService(IO_Service), m_Socket(IO_Service), m_SendBuffer("")
{
tcp::endpoint EndPoint = *EndPointIter;
m_Socket.async_connect(EndPoint,
boost::bind(&TCPClient::OnConnect, this, boost::asio::placeholders::error, ++EndPointIter));
}
void TCPClient::Close()
{
m_IOService.post(
boost::bind(&TCPClient::DoClose, this));
}
void TCPClient::OnConnect(const boost::system::error_code& ErrorCode, tcp::resolver::iterator EndPointIter)
{
cout << "OnConnect..." << endl;
if (ErrorCode == 0)
{
cin >> m_SendBuffer;
cout << "Entered: " << m_SendBuffer << endl;
m_SendBuffer += "\0";
m_Socket.async_send(boost::asio::buffer(m_SendBuffer.c_str(),m_SendBuffer.length()+1),
boost::bind(&TCPClient::OnSend, this,
boost::asio::placeholders::error));
}
else if (EndPointIter != tcp::resolver::iterator())
{
m_Socket.close();
tcp::endpoint EndPoint = *EndPointIter;
m_Socket.async_connect(EndPoint,
boost::bind(&TCPClient::OnConnect, this, boost::asio::placeholders::error, ++EndPointIter));
}
}
void TCPClient::OnReceive(const boost::system::error_code& ErrorCode)
{
cout << "receiving..." << endl;
if (ErrorCode == 0)
{
cout << m_RecieveBuffer << endl;
m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
boost::bind(&TCPClient::OnReceive, this, boost::asio::placeholders::error));
}
else
{
cout << "ERROR! OnReceive..." << endl;
DoClose();
}
}
void TCPClient::OnSend(const boost::system::error_code& ErrorCode)
{
cout << "sending..." << endl;
if (!ErrorCode)
{
cout << "\""<< m_SendBuffer <<"\" has been sent" << endl;
m_SendBuffer = "";
m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
boost::bind(&TCPClient::OnReceive, this, boost::asio::placeholders::error));
}
else
{
cout << "OnSend closing" << endl;
DoClose();
}
}
void TCPClient::DoClose()
{
m_Socket.close();
}
int main()
{
try
{
cout << "Client is starting..." << endl;
boost::asio::io_service IO_Service;
tcp::resolver Resolver(IO_Service);
string port = "13";
tcp::resolver::query Query("127.0.0.1", port);
tcp::resolver::iterator EndPointIterator = Resolver.resolve(Query);
TCPClient Client(IO_Service, EndPointIterator);
cout << "Client is started!" << endl;
cout << "Enter a query string " << endl;
boost::thread ClientThread(boost::bind(&boost::asio::io_service::run, &IO_Service));
Client.Close();
ClientThread.join();
}
catch (exception& e)
{
cerr << e.what() << endl;
}
cout << "\nClosing";
getch();
}
Ici, elle est sortie de la console
Client is starting...
Client is started!
OnConnect...
12
Entered: 12
sending...
"12" has been sent
receiving...
ERROR! OnReceive...
Closing
Partie serveur
class Session
{
public:
Session(boost::asio::io_service& io_service)
: socket_(io_service)
{
dataRx[0] = '\0';
dataTx[0] = '\0';
}
tcp::socket& socket()
{
return socket_;
}
void start()
{
socket_.async_read_some(boost::asio::buffer(dataRx, max_length),
boost::bind(&Session::handle_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
{
cout << "reading..." << endl;
cout << "Data: " << dataRx << endl;
if (!error)
{
if (!isValidData())
{
cout << "Bad data!" << endl;
sprintf(dataTx, "Bad data!\0");
dataRx[0] = '\0';
}
else
{
sprintf(dataTx, getFactorization().c_str());
dataRx[0] = '\0';
}
boost::asio::async_write(socket_,
boost::asio::buffer(dataTx, max_length*2),
boost::bind(&Session::handle_write, this,
boost::asio::placeholders::error));
}
else
{
delete this;
}
}
void handle_write(const boost::system::error_code& error)
{
cout << "writing..." << endl;
if (!error)
{
cout << "dataTx sent: " << dataTx << endl;
dataTx[0] = '\0';
socket_.async_read_some(boost::asio::buffer(dataRx, max_length),
boost::bind(&Session::handle_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
else
{
delete this;
}
}
string getFactorization() const
{
//Do something
}
bool isValidData()
{
locale loc;
for (int i = 0; i < strlen(dataRx); i++)
if (!isdigit(dataRx[i],loc))
return false;
return true;
}
private:
tcp::socket socket_;
static const size_t max_length = 100;
char dataRx[max_length];
char dataTx[max_length*2];
};
class Server
{
public:
Server(boost::asio::io_service& io_service, short port)
: io_service_(io_service),
acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
{
Session* new_session = new Session(io_service_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&Server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
void handle_accept(Session* new_session, const boost::system::error_code& error)
{
if (!error)
{
new_session->start();
new_session = new Session(io_service_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&Server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
else
{
delete new_session;
}
}
private:
boost::asio::io_service& io_service_;
tcp::acceptor acceptor_;
};
int main(int argc, char* argv[])
{
cout << "Server is runing..." << endl;
try
{
boost::asio::io_service io_service;
int port = 13;
Server s(io_service, port);
cout << "Server is run!" << endl;
io_service.run();
}
catch (boost::system::error_code& e)
{
std::cerr << e << "\n";
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
Du serveur de sortie
Server is runing...
Server is run!
reading...
Data: 12
writing...
dataTx sent: 13 //just send back received ++number
reading...
Data:
Votre aide sera très appréciée
========
Ajouté
Ok, je comprends. Mais vérifiez ErrorCode == boost::asio::erreur::eof ne fonctionne pas... Qu'ai-je fait de mal?
else if (ErrorCode == boost::asio::error::eof)
{
cout << "boost::asio::error::eof in OnReceive!" << endl;
}
else
{
cout << "ERROR! OnReceive..." << ErrorCode << endl;
DoClose();
}
L'impression est ERROR! OnReceive...system:10009
il semble être ma comparaison est incorrect
========
Ajouté
J'ai trouvé la cause. J'ai déjà mentionné l'utilisation async_receive
(au lieu de async_read_some
) et swaped les lignes dans main
à
ClientThread.join();
Client.Close();
Maintenant il fonctionne très bien!
Maintenant, je suis en train de lire et écrire des données à partir de/à la prise en même temps (parce que le client doit être en mesure d'envoyer des demandes supplémentaires avant la réponse du serveur est reçu.
Dans OnConnect
fonction que j'ai créer boost threads:
boost::thread addMsgThread(boost::bind(&TCPClient::addMsgLoop, this));
boost::thread receivingThread(boost::bind(&TCPClient::startReceiving, this));
boost::thread sendingThread(boost::bind(&TCPClient::startSending, this));
avec inplementation
void TCPClient::startReceiving()
{
cout << "receiving..." << endl;
m_RecieveBuffer[0] = '\0';
m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
boost::bind(&TCPClient::receivingLoop, this, boost::asio::placeholders::error)); //runtime error here
cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl;
}
void TCPClient::receivingLoop(const boost::system::error_code& ErrorCode)
{
cout << "receiving..." << endl;
if (ErrorCode == 0)
{
cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl;
m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
boost::bind(&TCPClient::receivingLoop, this, boost::asio::placeholders::error));
}
else
{
cout << "ERROR! receivingLoop..." << ErrorCode << endl;
DoClose();
}
}
void TCPClient::addMsgLoop()
{
while (true)
{
string tmp;
cin >> tmp;
cout << "Entered: " << tmp << endl;
tmp += "\0";
try
{
msgQueue.push(tmp);
}
catch(exception &e)
{
cerr << "Canno add msg to send queue... " << e.what() << endl;
}
}
}
Le problème est le même avec les deux receive
et send
threads: erreur d'exécution (écrit violation d'accès quelque part dans les bibliothèques boost).
void TCPClient::startReceiving()
{
...
m_Socket.async_receive(); //runtime error here
}
Dans sequent version tout fonctionne très bien (mais je ne sais pas comment la mettre plusieurs d'envoi avant de répondre).
Quelqu'un peut-il me dire comment résoudre la question ou comment mettre en œuvre cette par un autre chemin? Peut être mise en commun peut aider, mais je suis maintenant convaincu que c'est le bon sens.
OnReceive
vous devez vérifier quelle est l'erreur que vous obtenez, il peut dire ce qu'est le problème.Et bien sûr vérifier le côté serveur, pour voir ce qui s'y passe.
Débogage d'erreur s'affiche 10009
Réfléchissez un peu:
m_SendBuffer += "\0";
(Indice: Quelle est la différence entre "\0"
et ""
?) Aussi, si vous allez utiliser read_some
, c'est votre travail pour vous assurer que vous avez reçu un ensemble au niveau de l'application message.
OriginalL'auteur Torrius | 2012-10-20
Vous devez vous connecter pour publier un commentaire.
boost::asio::ip::tcp::socket::async_read_some comme son nom l'indique n'est pas garanti de lire l'intégralité des données. Il définit
error
objet deboost::asio::error::eof
lorsque le client est fini d'écrire.L'erreur que vous obtenez est de ce fait:
partie serveur
Dans
else
bloc, vous êtes en supposant que c'est une erreur de cas et la fermeture de la connexion. Ce n'est pas toujours le cas. Avantelse
, vous devez vérifier pourerror == boost::asio::error::eof
.En dehors de cela dans la lecture de gestionnaire, vous devez garder la collecte de tout ce qui est lu dans une mémoire tampon jusqu'à ce que vous frappez
error == boost::asio::error::eof
. Alors seulement, vous devez valider les données de lecture et d'écriture de retour au client.Prendre un coup d'oeil au serveur HTTP Un, Deux, Trois mise en œuvre dans exemples section.
Mise à jour: Réponse à la mise à jour de question
Vous avez thread problème de synchronisation avec le code mis à jour.
msgQueue
est à la fois accessible à partir de deux ou plusieurs threads sans serrure.Si j'ai bien compris votre problème correctement, vous souhaitez:
Vous pouvez utiliser deux boost::asio::io_service::brins pour les deux tâches. Lors de l'utilisation de l'Asio, les brins sont le moyen de synchroniser vos tâches. Asio permet de s'assurer que les tâches affichées dans un brin sont exécutées de manière synchrone.
Dans
strand1
poster unsend
tâche qui ressemble à:read_user_input -> send_to_server -> handle_send -> read_user_input
Dans
strand2
poster unread
tâche qui ressemble à:read_some -> handle_read -> read_some
Cela permettra de s'assurer
msgQueue
n'est pas accessible simultanément à partir de deux fils. L'utilisation de deux sockets pour lire et écrire sur le serveur, assurez-vous simultanés en lecture et en écriture n'est pas appelée sur le même socket.error == boost::asio::error::eof
ne fonctionne pas (voir la question ci-dessous)avez-vous d'ajouter ce pour les deux, le serveur et le client? À partir du code que vous avez posté, il semble que vous avez ajoutés uniquement aux clients de lire gestionnaire. Aussi, merci de modifier votre question initiale d'ajouter d'autres commentaires. N'ajoutez pas de ceux qu'une autre réponse.
J'ai édité la question. S'il vous plaît, jetez un oeil sur celui-ci.
mise à jour de ma réponse pour votre édité question. Veuillez créer une nouvelle question, si elle n'est pas liée à la question d'origine.
merci beaucoup pour l'aide!
OriginalL'auteur Vikas