summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJaromil <jaromil@dyne.org>2012-04-17 16:39:13 (GMT)
committer Jaromil <jaromil@dyne.org>2012-04-17 16:39:13 (GMT)
commit992a8b37a34e2a43439b4b5df5e81cc65e31844e (patch)
tree2a55c49f4667ab7c1c956a9de429e769671a4186
parent30b84b7a60a25aa218c378efcb47732d4d0926ac (diff)
code cleanup and network debug (WIP)
-rw-r--r--src/hdsync_cli.cpp693
1 files changed, 387 insertions, 306 deletions
diff --git a/src/hdsync_cli.cpp b/src/hdsync_cli.cpp
index 88109a8..915ced3 100644
--- a/src/hdsync_cli.cpp
+++ b/src/hdsync_cli.cpp
@@ -27,7 +27,7 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/socket.h>
-#include <netdb.h>
+//#include <netdb.h>
#include <errno.h>
@@ -39,6 +39,7 @@
#include <avremote.h>
#include <parsers.h>
#include <zmq.h>
+#include <zmq_utils.h>
// our exit codes are shell style: 1 is error, 0 is success
#define ERR 1
@@ -72,13 +73,396 @@ char IPv4[24];
// 0mq
void *zcontext;
void *sock_in = NULL;
-void *sock_out = NULL;
+void *sock_shout = NULL;
char pgmaddr[64];
zmq_msg_t request;
// we use only getopt, no _long
static const char *short_options = "-hvts:p:";
+void cmdline(int argc, char **argv);
+
+
+int get_address();
+
+
+int shout(const char *message, int flags) {
+ int res;
+ // zmq_connect(sock_out, pgmaddr);
+ zmq_msg_init_size (&request, strlen(message));
+ memcpy (zmq_msg_data (&request), message, strlen(message));
+ printf ("Shouting \"%s\"\n", message);
+ res = zmq_send (sock_shout, &request, flags);
+ zmq_msg_close (&request);
+ // zmq_close(sock_out);
+ return(res);
+}
+
+// extract a string from a message received:
+// PREFIX;MESSAGE;
+// message should be a char of 512 bytes
+int expect(const char *prefix, char *message, int flags) {
+ zmq_msg_t reply;
+ char res[512];
+ char *p, *pp;
+ int pfxlen = strlen(prefix);
+
+ printf("Expecting prefix %s\n",prefix);
+
+ // listen for answers
+ zmq_msg_init (&reply);
+ zmq_recv (sock_in, &reply, flags);
+ // printf ("Received response\n");
+ snprintf(res,511,"%s",zmq_msg_data(&reply));
+ zmq_msg_close (&reply);
+
+ // parse the response and if ACK insert it in uniq list
+ if( strncmp(res,prefix,pfxlen) == 0) {
+ // quick and secure string parsing
+ p = res; while(*p != ';') p++; p++;
+ pp = p; while(*pp != ';') pp++; *pp='\0';
+
+ // *p has the ip string
+ snprintf(message,511,"%s",p);
+ printf("Expected prefix %s found: %s\n",prefix,message);
+ return(0);
+ }
+
+ return(1);
+}
+
+int handshake() {
+ int request_nbr;
+ char message[256];
+ int listindex = 0;
+ int listmax = 0;
+ char res[512];
+ int c, i;
+
+ if(chanID==1) { // first channel is the one to offer
+
+ ////////
+ // OFFER
+ //////////////////////////////
+ printf ("Offering on %s from address %s\n",pgmaddr, IPv4);
+
+ listmax = chanTOT; // number of desired peers
+ listmax--; // we count outrselves in
+ printf("looking to connect %u peers\n",listmax);
+ // allocate space for listener array
+ listeners = (char**) calloc(listmax,sizeof(char*));
+ for(i=0; i<listmax; i++) listeners[i] = (char*)calloc(16, sizeof(char));
+
+ // offer subscribes to all broadcasts by default
+ // somehow listening to ACK only doesn't works
+ snprintf(message,255,"OFFER;%s;",IPv4);
+ while(1) {
+
+ shout(message, 0);
+
+ if( expect("ACK",res, 0) == 0) {
+ printf("Got ACK from %s\n",res);
+ // parse the response and if ACK insert it in uniq list
+ for(c=0; c<listindex; c++)
+ if(strncmp(listeners[c], res, 16) == 0)
+ break; // found a duplicate
+ if(c==listindex) { // there was no duplicate
+ snprintf(listeners[c], 16, "%s", res);
+ printf("New listener: %s\n",listeners[c]);
+ listindex++;
+ }
+
+ if(listindex==listmax) { // goal reached
+ printf("Sending ready signals to listeners\n");
+ ///////////////////////////////////////////////
+ shout("READY", 0);
+ break; // break offer loop
+ } // if listmax reached
+
+ } // if ACK response
+
+ zmq_sleep(1);
+
+ } // offer while loop
+
+ } else { // all other channels listen
+
+ /////////
+ // LISTEN
+
+ printf ("Listening on %s from address %s\n",pgmaddr, IPv4);
+
+ snprintf(message,255,"ACK;%s;",IPv4);
+
+ while (1) {
+ // Wait for offer
+ while( expect("OFFER", res, 0) !=0); // blocking
+ // reply with ACK
+ shout(message, 0);
+
+ while( expect("READY", res, 0) !=0); // blocking
+ printf ("Received ready signal\n");
+ break; // quit loop on ready signal
+
+ }
+
+ }
+
+ return(1);
+}
+
+int main(int argc, char **argv) {
+ upnp_t *upnp;
+ int found;
+
+ //////////
+ ///// INIT
+
+ cmdline(argc, argv);
+
+ if(!filename[0]) {
+ fprintf(stderr,"not enough args specified on commandline, see help.");
+ exit(ERR);
+ }
+
+ fprintf(stderr,"HDSync starting for channel %u of %u\n",chanID, chanTOT);
+ fprintf(stderr,"will sync and play video: %s\n", filename);
+
+ upnp = create_upnp();
+
+ // no server specified, force localhost
+ if(!server[0]) sprintf(server,"localhost");
+
+ // commandline or detection found explicit addresses
+ snprintf(upnp->hostname, MAX_HOSTNAME_SIZE-1,"%s",server);
+ upnp->port = port;
+
+ if(!dryrun) {
+ if ( connect_upnp (upnp) < 0 ) {
+ fprintf(stderr,"can't connect to %s:%u: operation aborted.\n", server, port);
+ free_upnp(upnp);
+ exit(ERR);
+ } else {
+ fprintf(stderr,"UPNP connected to %s:%u\n",server, port);
+ close(upnp->sockfd);
+ upnp->sockfd = 0;
+ }
+ }
+
+ if( ! get_address() ) {
+ fprintf(stderr,"Error getting own address.\n");
+ exit(ERR);
+ }
+
+ // initialize 0mq
+ zcontext = zmq_init (1);
+
+ // create broadcast socket
+ sock_shout = zmq_socket (zcontext, ZMQ_PUB);
+ zmq_connect(sock_shout, pgmaddr);
+
+ // create input socket
+ sock_in = zmq_socket (zcontext, ZMQ_SUB);
+ zmq_bind (sock_in, pgmaddr);
+ zmq_setsockopt(sock_in,ZMQ_SUBSCRIBE,"",0);
+
+
+ /// END OF INIT
+ ////////////////
+
+
+ ////////////////////
+ if(! handshake()) {
+ // no error return so far, just running endless untill contact is established
+ fprintf(stderr,"error establishing contact with all desired channels, aborting.\n");
+ exit(ERR);
+ }
+
+ printf("Handshake successful\n");
+
+ //////////////////////
+ if(dryrun) { // DRYRUN
+ printf("Dry run\n");
+
+ // testing sync with no real playback
+ zmq_sleep(1);
+
+ if(chanID==1) { // offer should send sync
+
+ printf("Prepare to offer sync\n");
+
+ zmq_sleep(3);
+
+ shout("SYNC;GOGO;", 0);
+
+ } else { // listener should prepare to receive sync
+
+ printf("Prepare to receive sync\n");
+
+ // TODO
+ zmq_sleep(6);
+ }
+
+ //////////////////////
+
+ } else { // RUN FOR REAL
+
+ //////////////////////
+
+ // load, play and pause in sequence
+ // break and reopen connection in between
+
+
+ if(chanID==1) { // offer should send sync
+ zmq_sleep(1);
+ // create output socket
+ // sock_out = zmq_socket (zcontext, ZMQ_PUB);
+ // zmq_connect(sock_out, pgmaddr);
+ // prepare signal
+ zmq_msg_init_size (&request, 4);
+ memcpy (zmq_msg_data (&request), "SYNC", 4);
+
+ // stop player
+ connect_upnp(upnp);
+ render_upnp(upnp,"Stop","");
+ send_upnp(upnp);
+ recv_upnp(upnp, 1000);
+ close(upnp->sockfd);
+ upnp->sockfd = 0;
+
+
+ zmq_sleep(1);
+
+
+ // prepare player in position
+ render_uri_meta(upnp,filename);
+ render_upnp(upnp,"SetAVTransportURI", upnp->meta);
+ send_upnp(upnp);
+ recv_upnp(upnp, 1000);
+ close(upnp->sockfd);
+ upnp->sockfd = 0;
+
+ zmq_sleep(1);
+
+ // play
+ connect_upnp(upnp);
+ render_upnp(upnp,"Play","<Speed>1</Speed>");
+ send_upnp(upnp);
+ recv_upnp(upnp, 1000);
+ close(upnp->sockfd);
+ upnp->sockfd = 0;
+ // pause
+ connect_upnp(upnp);
+ render_upnp(upnp,"Pause","");
+ send_upnp(upnp);
+ recv_upnp(upnp, 1000);
+ close(upnp->sockfd);
+ upnp->sockfd = 0;
+
+ zmq_sleep(1);
+
+ // prepare upnp play message
+ connect_upnp(upnp);
+ render_upnp(upnp,"Play","<Speed>1</Speed>");
+
+ // send sync signal
+ // zmq_send (sock_out, &request, 0);
+
+ // start player
+ send_upnp(upnp);
+ recv_upnp(upnp, 1000);
+
+ } else { // this is a listener
+
+ // create input socket
+ sock_in = zmq_socket (zcontext, ZMQ_SUB);
+ zmq_bind (sock_in, pgmaddr);
+
+
+ // stop player
+ connect_upnp(upnp);
+ render_upnp(upnp,"Stop","");
+ send_upnp(upnp);
+ recv_upnp(upnp, 1000);
+ close(upnp->sockfd);
+ upnp->sockfd = 0;
+
+
+ zmq_sleep(1);
+
+
+ // prepare player in position
+ render_uri_meta(upnp,filename);
+ render_upnp(upnp,"SetAVTransportURI", upnp->meta);
+ send_upnp(upnp);
+ recv_upnp(upnp, 1000);
+ close(upnp->sockfd);
+ upnp->sockfd = 0;
+
+ zmq_sleep(1);
+
+ // play
+ connect_upnp(upnp);
+ render_upnp(upnp,"Play","<Speed>1</Speed>");
+ send_upnp(upnp);
+ recv_upnp(upnp, 1000);
+ close(upnp->sockfd);
+ upnp->sockfd = 0;
+ // pause
+ connect_upnp(upnp);
+ render_upnp(upnp,"Pause","");
+ send_upnp(upnp);
+ recv_upnp(upnp, 1000);
+ close(upnp->sockfd);
+ upnp->sockfd = 0;
+
+ zmq_sleep(1);
+
+ // prepare upnp play message
+ connect_upnp(upnp);
+ render_upnp(upnp,"Play","<Speed>1</Speed>");
+
+ // Wait for next request from client
+ zmq_msg_init (&request);
+ zmq_recv (sock_in, &request, 0);
+ if(strncmp((char*)zmq_msg_data(&request),"SYNC",4)==0) {
+ // start player
+ send_upnp(upnp);
+ recv_upnp(upnp, 1000);
+ }
+
+ } // if(offer|listen)
+
+ zmq_msg_close (&request);
+ free_upnp(upnp);
+
+ } // if(test|real)
+
+ fprintf(stderr,"Software shutdown\n");
+
+ // if(sock_out) {
+ // zmq_close (sock_out);
+ // sock_out = NULL;
+ // }
+ // if(sock_in) {
+ // zmq_close (sock_in);
+ // sock_in = NULL;
+ // }
+
+ // somehow offer blocks here
+ zmq_term (zcontext);
+
+ if(listeners) {
+ for(int c; c<chanTOT; c++)
+ free(listeners[c]);
+ free(listeners);
+ }
+
+ fprintf(stderr,"Done.\n");
+ exit(0);
+}
+
+
void cmdline(int argc, char **argv) {
filename[0] = 0;
server[0] = 0;
@@ -156,14 +540,7 @@ void cmdline(int argc, char **argv) {
} while(res != -1);
}
-int handshake() {
- int request_nbr;
- zmq_msg_t reply;
- char message[256];
- int listindex = 0;
- int listmax = 0;
- char res[128];
-
+int get_address() {
// discover own IP address
struct ifaddrs * ifAddrStruct=NULL;
struct ifaddrs * ifa=NULL;
@@ -194,301 +571,5 @@ int handshake() {
printf("Error: ethernet interface %s is not configured\n",ETHDEV);
return(0);
}
-
-
- // create output socket
- sock_out = zmq_socket (zcontext, ZMQ_PUB);
- zmq_connect(sock_out, pgmaddr);
-
- // create input socket
- sock_in = zmq_socket (zcontext, ZMQ_SUB);
- zmq_bind (sock_in, pgmaddr);
-
- if(chanID==1) { // first channel is the one to offer
- int i;
-
- ////////
- // OFFER
- //////////////////////////////
- printf ("Offering on %s from address %s\n",pgmaddr, IPv4);
-
- listmax = chanTOT; // number of desired peers
- listmax--; // we count outrselves in
- printf("looking to connect %u peers\n",listmax);
- // allocate space for listener array
- listeners = (char**) calloc(listmax,sizeof(char*));
- for(i=0; i<listmax; i++) listeners[i] = (char*)calloc(16, sizeof(char));
-
- // offer subscribes to all broadcasts by default
- // somehow listening to ACK only doesn't works
- zmq_setsockopt(sock_in,ZMQ_SUBSCRIBE,"",0);
- snprintf(message,255,"OFFER;%s;",IPv4);
- while(1) {
-
- // broadcast offer
- zmq_msg_init_size (&request, strlen(message));
- memcpy (zmq_msg_data (&request), message, strlen(message));
- // printf ("Sending offer...\n", request_nbr);
- zmq_send (sock_out, &request, 0);
- zmq_msg_close (&request);
-
- sleep(1);
-
- // listen for answers
- zmq_msg_init (&reply);
- zmq_recv (sock_in, &reply, 0);
- // printf ("Received response\n");
- snprintf(res,127,"%s",zmq_msg_data(&reply));
- zmq_msg_close (&reply);
- // parse the response and if ACK insert it in uniq list
- if( strncmp(res,"ACK",3) == 0) {
- int c;
- char *p, *pp;
- // quick and secure string parsing
- p = res; while(*p != ';') p++; p++;
- pp = p; while(*pp != ';') pp++; *pp='\0';
-
- // *p has the ip string
- for(c=0; c<listindex; c++)
- if(strncmp(listeners[c], p, 16) == 0)
- break; // found a duplicate
- if(c==listindex) { // there was no duplicate
- snprintf(listeners[c], 16, "%s", p);
- printf("New listener: %s\n",p);
- listindex++;
- }
- if(listindex==listmax) { // goal reached
- printf("Sending ready signals to listeners:\n");
- ///////////////////////////////////////////////
- for(c=0;c<listmax;c++) {
- char tmpaddr[256];
- snprintf(tmpaddr,255, "tcp://%s:%s",listeners[c],PGMPORT);
- printf (" %s\n", tmpaddr);
- zmq_connect(sock_out, tmpaddr);
- zmq_msg_init_size (&request, 5);
- memcpy (zmq_msg_data (&request), "READY", 5);
- zmq_send (sock_out, &request, 0);
- zmq_msg_close (&request);
- }
- break; // break offer loop
- } // if listmax reached
-
- } // if ACK response
-
- } // offer while loop
-
- } else { // all other channels listen
-
- /////////
- // LISTEN
-
-
- printf ("Listening on %s from address %s\n",pgmaddr, IPv4);
-
- // listen subscribes only to offers and ready
- zmq_setsockopt(sock_in,ZMQ_SUBSCRIBE,"OFFER",5);
- zmq_setsockopt(sock_in,ZMQ_SUBSCRIBE,"READY",5);
-
-
- snprintf(message,255,"ACK;%s;",IPv4);
-
- while (1) {
- // Wait for next request from client
- zmq_msg_init (&request);
- zmq_recv (sock_in, &request, 0);
-
- snprintf(res,127,"%s",zmq_msg_data(&request));
- zmq_msg_close (&request);
- if(strncmp(res,"READY",5) == 0) {
- printf ("Received ready signal\n");
- break; // quit loop on ready signal
- }
- sleep (1);
-
- // Send reply back to client
- zmq_msg_init_size (&reply, strlen(message));
- memcpy (zmq_msg_data (&reply), message, strlen(message));
- zmq_send (sock_out, &reply, 0);
- zmq_msg_close (&reply);
-
- }
-
- }
-
- if(sock_out) {
- zmq_close (sock_out);
- sock_out = NULL;
- }
- if(sock_in) {
- zmq_close (sock_in);
- sock_in = NULL;
- }
-
return(1);
}
-
-int main(int argc, char **argv) {
- upnp_t *upnp;
- int found;
-
- cmdline(argc, argv);
-
- if(!filename[0]) {
- fprintf(stderr,"not enough args specified on commandline, see help.");
- exit(ERR);
- }
-
- fprintf(stderr,"HDSync starting for channel %u of %u\n",chanID, chanTOT);
- fprintf(stderr,"will sync and play video: %s\n", filename);
-
- upnp = create_upnp();
-
- // no server specified, force localhost
- if(!server[0]) sprintf(server,"localhost");
-
- // commandline or detection found explicit addresses
- snprintf(upnp->hostname, MAX_HOSTNAME_SIZE-1,"%s",server);
- upnp->port = port;
-
- if(!dryrun) {
- if ( connect_upnp (upnp) < 0 )
- {
- fprintf(stderr,"can't connect to %s:%u: operation aborted.\n", server, port);
- free_upnp(upnp);
- exit(ERR);
- }
- }
-
- // initialize 0mq
- zcontext = zmq_init (1);
-
- if(! handshake()) {
- // might never get here since while(1) tries endlessly
- fprintf(stderr,"error establishing contact with all desired channels, aborting.\n");
- exit(ERR);
- }
-
- //////////////////////
- if(dryrun) { // DRYRUN
- // testing sync with no real playback
- sleep(1);
-
- if(chanID==1) { // offer should send sync
- sleep(3);
- // create output socket
- sock_out = zmq_socket (zcontext, ZMQ_PUB);
- zmq_connect(sock_out, pgmaddr);
- // broadcast offer
- zmq_msg_init_size (&request, 4);
- memcpy (zmq_msg_data (&request), "SYNC", 4);
- // printf ("Sending offer...\n", request_nbr);
- zmq_send (sock_out, &request, 0);
- fprintf(stderr,"SYNC\n");
- zmq_msg_close (&request);
-
- } else { // listener should prepare to receive sync
-
- // create input socket
- sock_in = zmq_socket (zcontext, ZMQ_SUB);
- zmq_bind (sock_in, pgmaddr);
- zmq_setsockopt(sock_in,ZMQ_SUBSCRIBE,"",0);
- // Wait for next request from client
- zmq_msg_init (&request);
- zmq_recv (sock_in, &request, 0);
- if(strncmp((char*)zmq_msg_data(&request),"SYNC",4)==0) {
- fprintf(stderr,"SYNC\n");
- }
- zmq_msg_close (&request);
-
- }
-
- //////////////////////
- } else { // RUN FOR REAL
-
-
- // Prepare
-
- // load, play and pause in sequence
- // break and reopen connection in between
-
- // was connected already
-
- // load
- render_uri_meta(upnp,filename);
- render_upnp(upnp,"SetAVTransportURI", upnp->meta);
-
- // must re-connect socket between commands
- send_upnp(upnp);
- recv_upnp(upnp, 1000);
- close(upnp->sockfd);
- upnp->sockfd = 0;
-
- connect_upnp(upnp);
- render_upnp(upnp,"Play","<Speed>1</Speed>");
-
- send_upnp(upnp);
- recv_upnp(upnp, 1000);
- close(upnp->sockfd);
- upnp->sockfd = 0;
-
- connect_upnp(upnp);
- render_upnp(upnp,"Pause","");
- send_upnp(upnp);
- recv_upnp(upnp, 1000);
-
- // START SYNC
-
- // was connected already
- render_upnp(upnp,"Play","<Speed>1</Speed>");
- send_upnp(upnp);
- recv_upnp(upnp, 1000);
-
- /*
- case 'g': // dump a parsable full state of the device
- render_upnp(upnp,"GetTransportInfo","");
- parser = GetTransportInfo;
-
- break;
-
- case 'm': // set the playmode:
- // "NORMAL", "REPEAT_ONE", "REPEAT_ALL", "RANDOM"
- {
- char tmp[256];
- snprintf(tmp,255,"<NewPlayMode>%s</NewPlayMode>",filename);
- render_upnp(upnp,"SetPlayMode",tmp);
- }
- break;
-
- case 'j': // jump aka seek
- // <SeekMode> and <SeekTarget>
- {
- char tmp[512];
- snprintf(tmp,511,"<Unit>REL_TIME</Unit><Target>%s</Target>",filename);
- render_upnp(upnp,"Seek",tmp);
- }
- break;
- */
-
- free_upnp(upnp);
- }
-
- if(sock_out) {
- zmq_close (sock_out);
- sock_out = NULL;
- }
- if(sock_in) {
- zmq_close (sock_in);
- sock_in = NULL;
- }
-
- zmq_term (zcontext);
-
- if(listeners) {
- for(int c; c<chanTOT; c++)
- free(listeners[c]);
- free(listeners);
- }
-
-
- exit(0);
-}