ANNA Suite  2020b
Multipurpose development suite for Telco applications
Namespaces | Classes
anna::comm Namespace Reference

Namespaces

 handler
 
 socket
 

Classes

class  AccessPoint
 
class  Application
 
class  BinderSocket
 
class  Buffer
 
class  ByRangeDelivery
 
class  ClientSocket
 
class  Codec
 
class  Communicator
 
class  CompatCodec
 
class  CongestionController
 
class  ConnectionRecover
 
class  DatagramSocket
 
class  Delivery
 
class  Device
 
class  DirectTransport
 
struct  functions
 
class  Handler
 
class  Host
 
class  IndexedDelivery
 
class  INetAddress
 
class  LargeBinaryCodec
 
class  LiteTransport
 
class  LocalConnection
 
class  Message
 
class  Network
 
class  Poll
 
class  Receiver
 
class  ReceiverFactory
 
class  RemoteConnection
 
class  Resource
 
class  RoundRobinDelivery
 
class  sccs
 
class  Server
 
class  ServerAllocator
 
class  ServerSocket
 
class  Service
 
class  Socket
 
class  Status
 
class  SureTransport
 
class  Transport
 
class  TransportFactory
 
class  TransportFactoryImpl
 
class  Variable
 

Detailed Description

Proporciona las clases necesarias para la comunicacion entre procesos.

A continuacion presentamos el codigo de ejemplo de un servidor aritmetico. Recibe dos operadores y una operacion (+,-,*,/) y devuelve el resultado de la operacion. Para estudiar los sistemas de control de congestion hemos incorporado la posibilidad de que el servidor espere un numero de segundo indeterminado antes de dar la respuesta.

#include <iostream>
#include <anna.h>
#include <anna/comm/h>
#include <test.Request.h>
#include <test.Response.h>
using namespace std;
using namespace test;
//-----------------------------------------------------------------------------------------
// Define el comunicador de nuestra aplicacin.
//
// Las peticiones y respuestas van codificadas mediante un comm::Codec pero podriamos
// haber utilizado cualquier otro medio de codificacin ya que la capa de transporte
// es totalmente independiente del contenido del mensaje.
//
// De cara a la capa de transporte lo unico que importa es el cliente y el servidor
// codifiquen/decodifiquen de la misma forma.
//-----------------------------------------------------------------------------------------
class MyCommunicator : public comm::Communicator {
public:
MyCommunicator () {;}
void setDelay (const Millisecond &delay) { a_delay = delay; }
private:
Request a_request;
Response a_response;
Millisecond a_delay;
void eventReceiveMessage (comm::ClientSocket &, const DataBlock& data)
noexcept(false);
};
class ArithmeticServer : public comm::Application {
public:
ArithmeticServer ();
private:
MyCommunicator a_communicator;
comm::ServerSocket* a_serverSocket;
void initialize () noexcept(false);
void run () noexcept(false);
};
using namespace std;
using namespace anna::comm;
int main (int argc, const char** argv)
{
CommandLine& commandLine (CommandLine::instantiate ());
ArithmeticServer app;
srand (time (NULL));
try {
commandLine.initialize (argv, argc);
commandLine.verify ();
Logger::initialize ("arithmeticServer", new anna::TraceWriter ("file.trace", 4048000));
app.start ();
}
catch (Exception& ex) {
cout << ex.asString () << endl;
}
return 0;
}
ArithmeticServer::ArithmeticServer () :
Application ("arithmeticServer", "Servidor de operaciones aritmeticas", "1.0")
{
CommandLine& commandLine (CommandLine::instantiate ());
commandLine.add ("p", CommandLine::Argument::Mandatory, "Puerto en el que atender peticiones");
commandLine.add ("a", CommandLine::Argument::Mandatory, "Direccin IP en la que atender");
commandLine.add ("d", CommandLine::Argument::Mandatory, "Retardo aplicado a la contestacio");
commandLine.add ("r", CommandLine::Argument::Optional, "Indicador de reuso de direccin", false);
commandLine.add ("limit", CommandLine::Argument::Mandatory, "% de ocupacion que permitimos");
}
//-----------------------------------------------------------------------------------------
// Inicializa el servidor de sockets.
//-----------------------------------------------------------------------------------------
void ArithmeticServer::initialize ()
noexcept(false)
{
CommandLine& cl (CommandLine::instantiate ());
int port = cl.getIntegerValue ("p");
const comm::Device* device = Network::instantiate ().find (Device::asAddress (cl.getValue ("a")));
a_serverSocket = new ServerSocket (INetAddress (device, port), cl.exists ("r"));
}
//-----------------------------------------------------------------------------------------
// Atiende las peticiones.
// Cuando hay un nuevo mensaje invocar�a Communicator::eventReceiveMessage
//-----------------------------------------------------------------------------------------
void ArithmeticServer::run ()
noexcept(false)
{
CommandLine& cl (CommandLine::instantiate ());
a_communicator.attach (a_serverSocket);
a_communicator.setDelay (cl.getIntegerValue ("d"));
CongestionController::instantiate ().setLimit (cl.getIntegerValue ("limit"));
a_communicator.accept ();
}
//-----------------------------------------------------------------------------------------
// Manejador de peticiones.
// Calcular�la operacin solicitada y devolver�el resultado.
//
// clientSocket: Socket cliente por el que podemos responder a la peticin.
// transport: Instancia del transporto que ha interpretado el mensaje (getMessage).
//-----------------------------------------------------------------------------------------
void MyCommunicator::eventReceiveMessage (ClientSocket& clientSocket, const DataBlock& data)
noexcept(false)
{
LOGMETHOD (TraceMethod tm ("MyCommunicator", "eventReceiveMessage", ANNA_FILE_LOCATION));
static int messageCounter = 0;
static int successCounter = 0;
int value;
CongestionController& congestionController = CongestionController::instantiate ();
messageCounter ++;
if (congestionController.getAdvice (clientSocket) == CongestionController::Advice::Discard)
return;
successCounter ++;
int random = rand () % (a_delay / 10);
int sign = rand () % 2;
if (sign == 0)
random *= -1;
anna::functions::sleep (a_delay + random);
a_request.decode (data);
a_response.x = a_request.x;
a_response.y = a_request.y;
switch (a_response.op = a_request.op) {
case '+':
a_response.result = a_request.x + a_request.y;
break;
case '-':
a_response.result = a_request.x - a_request.y;
break;
case '*':
a_response.result = a_request.x * a_request.y;
break;
case '/':
a_response.result = (a_request.y != 0) ? (a_request.x / a_request.y): 0;
break;
}
"%d %c %d = %d", a_request.x, a_request.op, a_request.y, a_response.result
);
);
try {
clientSocket.send (a_response.code ());
}
catch (Exception& ex) {
ex.trace ();
}
}

El siguiente ejemplo muestra un cliente correspondiente al servidor anterior, que lanza un numero determinado de peticiones por segundo.

#include <iostream>
#include <string.h>
#include <anna.h>
#include <anna/comm/h>
#include <anna.timex.Engine.h>
#include <anna.timex.Clock.h>
#include <test.Response.h>
#include <test.Request.h>
class Sender : public timex::Clock {
public:
Sender () : Clock ("Sender", 250), a_messageBySecond (0), a_nquarter (0) {;}
void setMessageBySecond (const int messageBySecond) { a_messageBySecond = messageBySecond; }
private:
int a_messageBySecond;
int a_nquarter;
test::Request a_request;
void tick () noexcept(false);
};
//-----------------------------------------------------------------------------------------
// Define el comunicador de nuestra aplicacin.
//
// Las peticiones y respuestas van codificadas mediante un comm::Codec pero podriamos
// haber utilizado cualquier otro medio de codificacin ya que la capa de transporte
// es totalmente independiente del contenido del mensaje.
//-----------------------------------------------------------------------------------------
class MyCommunicator : public Communicator {
public:
MyCommunicator () : Communicator () {;}
private:
test::Response a_response;
void eventReceiveMessage (ClientSocket &, const DataBlock&)
noexcept(false);
};
class HeavyClient : public anna::comm::Application {
public:
HeavyClient ();
Server* getServer () const { return a_server; }
private:
MyCommunicator a_communicator;
timex::Engine a_timeController;
Sender a_sender;
Server* a_server;
void initialize () noexcept(false);
void run () noexcept(false);
};
using namespace std;
int main (int argc, const char** argv)
{
CommandLine& commandLine (CommandLine::instantiate ());
HeavyClient app;
srand (time (NULL));
try {
commandLine.initialize (argv, argc);
commandLine.verify ();
Logger::initialize ("arithmeticClient", new TraceWriter ("file.trace", 1048000));
app.start ();
}
catch (Exception& ex) {
cout << ex.asString () << endl;
}
return 0;
}
HeavyClient::HeavyClient () :
Application ("arithmeticClient", "Cliente de operaciones aritmeticas", "1.0"),
a_communicator (),
a_timeController ((Millisecond)10000, (Millisecond)250)
{
CommandLine& commandLine (CommandLine::instantiate ());
commandLine.add ("p", CommandLine::Argument::Mandatory, "Puerto en el que el servidor atiende respuestas.");
commandLine.add ("a", CommandLine::Argument::Mandatory, "Direccin IP Puerto en el que el servidor atiende respuestas.");
commandLine.add ("n", CommandLine::Argument::Mandatory, "Numero de mensajes por segundo");
}
//-----------------------------------------------------------------------------------------
// Inicializa las conexiones usadas por la aplicacin.
//
// Primero establece los datos para la conexin con el servidor aritm�ico y luego
// establece los datos del socket servidor necesario para atender respuestas y aceptar
// nuevas conexiones de procesos clientes (que no ser�el caso).
// Configura el men para que trabaje con el comunicador de esta aplicacin.
//-----------------------------------------------------------------------------------------
void HeavyClient::initialize ()
noexcept(false)
{
CommandLine& cl (CommandLine::instantiate ());
Network& network = Network::instantiate ();
Host* host = network.find ("host000");
host->assign (network.find (Device::asAddress (cl.getValue ("a"))));
a_server = host->createServer ("rawServer", cl.getIntegerValue ("p"), true);
a_sender.setMessageBySecond (cl.getIntegerValue ("n"));
}
//-----------------------------------------------------------------------------------------
// Activa el reloj que dara el pulso para enviar los mensajes al servidor y comienza a
// atender las peticiones.
//-----------------------------------------------------------------------------------------
void HeavyClient::run ()
noexcept(false)
{
a_timeController.activate (a_sender);
a_communicator.accept ();
}
//-----------------------------------------------------------------------------------------
// Manejador de respuesta.
//
// clientSocket: Socket cliente por el que podemos responder a la peticin.
// transport: Instancia del transporto que ha interpretado el mensaje (getMessage).
//-----------------------------------------------------------------------------------------
void MyCommunicator::eventReceiveMessage (ClientSocket&, const DataBlock& data)
noexcept(false)
{
a_response.decode (data);
"%d %c %d = %d", a_response.x, a_response.op, a_response.y, a_response.result
);
}
void Sender::tick ()
noexcept(false)
{
Server* server = static_cast <HeavyClient&> (anna::app::functions::getApp ()).getServer ();
Communicator* communicator = anna::app::functions::component <Communicator> (ANNA_FILE_LOCATION);
int maxn = a_messageBySecond / 4;
if (++ a_nquarter == 4) {
maxn += a_messageBySecond % 4;
a_nquarter = 0;
}
if (maxn == 0)
return;
maxn = rand () % maxn;
for (int n = 0; n < maxn; n ++) {
a_request.op = '+';
a_request.x = rand () % 1000;
a_request.y = rand () % 1000;
try {
server->send (a_request.code ());
}
catch (RuntimeException& ex) {
ex.trace ();
break;
}
}
}

El ejecutable debera enlazarse con las librerias:

El Packet Header es anna/comm/h