Logo Search packages:      
Sourcecode: zeromq version File versions  Download package

pgmsend.c

/* vim:ts=8:sts=8:sw=4:noai:noexpandtab
 *
 * Simple sender using the PGM transport.
 *
 * Copyright (c) 2006-2007 Miru Limited.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */


#include <errno.h>
#include <getopt.h>
#include <locale.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>

#include <glib.h>

#ifdef G_OS_UNIX
#     include <netdb.h>
#     include <unistd.h>
#     include <arpa/inet.h>
#     include <netinet/in.h>
#     include <sys/socket.h>
#     include <sys/time.h>
#endif

#include <pgm/pgm.h>
#include <pgm/backtrace.h>
#include <pgm/log.h>


/* typedefs */

/* globals */

static int g_port = 0;
static const char* g_network = "";
static gboolean g_multicast_loop = FALSE;
static int g_udp_encap_port = 0;

static int g_max_tpdu = 1500;
static int g_max_rte = 400*1000;
static int g_sqns = 100;

static gboolean g_fec = FALSE;
static int g_k = 64;
static int g_n = 255;

static pgm_transport_t* g_transport = NULL;

static gboolean create_transport (void);


G_GNUC_NORETURN static void
usage (const char* bin)
{
      fprintf (stderr, "Usage: %s [options] message\n", bin);
      fprintf (stderr, "  -n <network>    : Multicast group or unicast IP address\n");
      fprintf (stderr, "  -s <port>       : IP port\n");
      fprintf (stderr, "  -p <port>       : Encapsulate PGM in UDP on IP port\n");
      fprintf (stderr, "  -r <rate>       : Regulate to rate bytes per second\n");
      fprintf (stderr, "  -f <type>       : Enable FEC with either proactive or ondemand parity\n");
      fprintf (stderr, "  -k <k>          : Configure Reed-Solomon code (n, k)\n");
      fprintf (stderr, "  -g <n>\n");
      fprintf (stderr, "  -l              : Enable multicast loopback and address sharing\n");
      fprintf (stderr, "  -i              : List available interfaces\n");
      exit (1);
}

int
main (
      int   argc,
      char   *argv[]
      )
{
      GError* err = NULL;

      setlocale (LC_ALL, "");

      log_init ();
      if (!pgm_init (&err)) {
            g_error ("Unable to start PGM engine: %s", err->message);
            g_error_free (err);
            return EXIT_FAILURE;
      }

/* parse program arguments */
      const char* binary_name = strrchr (argv[0], '/');
      int c;
      while ((c = getopt (argc, argv, "s:n:p:r:f:k:g:lih")) != -1)
      {
            switch (c) {
            case 'n':   g_network = optarg; break;
            case 's':   g_port = atoi (optarg); break;
            case 'p':   g_udp_encap_port = atoi (optarg); break;
            case 'r':   g_max_rte = atoi (optarg); break;

            case 'f':   g_fec = TRUE; break;
            case 'k':   g_k = atoi (optarg); break;
            case 'g':   g_n = atoi (optarg); break;

            case 'l':   g_multicast_loop = TRUE; break;

            case 'i':
                  pgm_if_print_all ();
                  return EXIT_SUCCESS;

            case 'h':
            case '?':
                  usage (binary_name);
            }
      }

      if (g_fec && ( !g_k || !g_n )) {
            puts ("Invalid Reed-Solomon parameters.");
            usage (binary_name);
      }

/* setup signal handlers */
      signal (SIGSEGV, on_sigsegv);
#ifdef SIGHUP
      signal (SIGHUP, SIG_IGN);
#endif

      if (create_transport ())
      {
            while (optind < argc) {
                  const PGMIOStatus status = pgm_send (g_transport, argv[optind], strlen(argv[optind]) + 1, NULL);
                    if (PGM_IO_STATUS_NORMAL != status) {
                        g_warning ("pgm_send failed.");
                    }
                  optind++;
            }
      }

/* cleanup */
      if (g_transport) {
            pgm_transport_destroy (g_transport, TRUE);
            g_transport = NULL;
      }
      pgm_shutdown ();
      return EXIT_SUCCESS;
}

static gboolean
create_transport (void)
{
      struct pgm_transport_info_t* res = NULL;
      GError* err = NULL;

/* parse network parameter into transport address structure */
      char network[1024];
      sprintf (network, "%s", g_network);
      if (!pgm_if_get_transport_info (network, NULL, &res, &err)) {
            g_error ("parsing network parameter: %s", err->message);
            g_error_free (err);
            return FALSE;
      }
/* create global session identifier */
      if (!pgm_gsi_create_from_hostname (&res->ti_gsi, &err)) {
            g_error ("creating GSI: %s", err->message);
            g_error_free (err);
            pgm_if_free_transport_info (res);
            return FALSE;
      }
      if (g_udp_encap_port) {
            res->ti_udp_encap_ucast_port = g_udp_encap_port;
            res->ti_udp_encap_mcast_port = g_udp_encap_port;
      }
      if (g_port)
            res->ti_dport = g_port;
      if (!pgm_transport_create (&g_transport, res, &err)) {
            g_error ("creating transport: %s", err->message);
            g_error_free (err);
            pgm_if_free_transport_info (res);
            return FALSE;
      }
      pgm_if_free_transport_info (res);

/* set PGM parameters */
      pgm_transport_set_send_only (g_transport, TRUE);
      pgm_transport_set_max_tpdu (g_transport, g_max_tpdu);
      pgm_transport_set_txw_sqns (g_transport, g_sqns);
      pgm_transport_set_txw_max_rte (g_transport, g_max_rte);
      pgm_transport_set_multicast_loop (g_transport, g_multicast_loop);
      pgm_transport_set_hops (g_transport, 16);
      pgm_transport_set_ambient_spm (g_transport, pgm_secs(30));
      guint spm_heartbeat[] = { pgm_msecs(100), pgm_msecs(100), pgm_msecs(100), pgm_msecs(100), pgm_msecs(1300), pgm_secs(7
), pgm_secs(16), pgm_secs(25), pgm_secs(30) };
      pgm_transport_set_heartbeat_spm (g_transport, spm_heartbeat, G_N_ELEMENTS(spm_heartbeat));
      if (g_fec) {
            pgm_transport_set_fec (g_transport, 0, TRUE, TRUE, g_n, g_k);
      }

/* assign transport to specified address */
      if (!pgm_transport_bind (g_transport, &err)) {
            g_error ("binding transport: %s", err->message);
            g_error_free (err);
            pgm_transport_destroy (g_transport, FALSE);
            g_transport = NULL;
            return FALSE;
      }
      return TRUE;
}

/* eof */

Generated by  Doxygen 1.6.0   Back to index