root/dbeng/dbengrep.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. dbengrep_init
  2. dbengrep_end
  3. dbengrep_recv
  4. dbengrep_send
  5. dbengrep_rll_delete_all
  6. dbengrep_rll_load
  7. dbengrep_recv_write
  8. dbengrep_recv_delete
  9. dbengrep_recv_pack
  10. dbengrep_recv_get_fpos
  11. dbengrep_rll_add
  12. dbengrep_rll_find
  13. dbengrep_rll_delete
  14. dbengrep_sll_load
  15. dbengrep_sll_add
  16. dbengrep_sll_delete
  17. dbengrep_sll_find
  18. dbengrep_sll_delete_all
  19. dbengrep_set_active
  20. dbengrep_send_recv
  21. dbengrep_connect

/* Bbuuzzb replication management module.
   Rick Smereka, Copyright (C) 2002-2004.

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, get a copy via the Internet at
   http://gnu.org/copyleft/gpl.html or write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston,
   MA 02111-1307 USA

   You can contact the author via email at rsmereka@future-lab.com

   This module should only be linked with the Bbuuzzb database server.
   Currently, replication is only available for the TCP/IP IPC method. This
   module interfaces with the 'socloc' API to obtain the details of
   other running Bbuuzzb servers.

   This module manages two link lists. The first is 'dbengrep_servers'
   which contains a list of currently available 'Bbuuzzb' servers
   (not including this one). The second link list is 'dbeng_rep'
   which is contained within each table in use ('dbeng_table').
   When 'dbengrep_init' is called, the 'dbengep_servers' link
   list will be loaded and each current table in use will have
   its replication details loaded (from the catalog) into
   the 'dbeng_rep' link list. Once the replication initialization
   is complete, all non-system tables will have their
   replication details automatically loaded when the table is
   opened.

   When a record change or delete is made, the 'Bbuuzzb' servers listed 
   in the replication link list will each receive the corresponding change
   command.

   Replication details are stored in field three of the
   catalog. This field contains a 'sub-field' array
   with each 'sub-field' containing two 'sub-sub-fields'
   Here is the layout:

      Sub-Sub-Field  Description

            1        TCP port of target Bbuuzzb server
            2        logical table name on target server

   Notice that you must always supply the logical table name which
   means that the target table must also be cataloged and the
   catalog flag on all listed servers (and this one) must also be 
   high. The replication flag of this server and every server 
   listed in the replication details must be high also.
   
   Original Linux version. Jan/2002, Rick Smereka

   Ported to Debian Linux. Nov/2002, Rick Smereka

   Changed call to 'connect' to use 'SA' definition (defined in
   'flsocket.h'). Jun/2003, Rick Smereka

   Added a parameter to the function 'dbengrep_rll_load.
   Modified function 'dbengrep_init' to call 'dbeng_catalog'
   functions to acquire replication list entries.
   Changed all logging calls from 'sys_log' to 'logman'.
   Mar/2004, Rick Smereka */

#include "stdhead.h"
#include "flsocket.h"
#include "dbmess.h"
#include "dbeng.h"
#include "dbengcfg.h"
#include "dbengrep.h"
#include "dbengcat.h"
#include "socloc.h"
#include "sloc.h"

/* module globals */

/* pointer to top of available Bbuuzzb server link list */

struct dbengrep_servers *dbengrep_shead = DBENGREP_SOT_NULL;

/* pointer to destination table */

struct dbeng_table *rep_table;

/* this server port number (declared in dbsrv.c) */

extern int server_port;

/* pointer to top of 'dbeng_table' list (declared in dbeng.c) */

extern struct dbeng_table *dbeng_head;

/* socket globals */

#ifdef OS_WIN32
SOCKET dbengrep_clientSocket;
SOCKADDR_IN dbengrep_sockClientAddr;
LPHOSTENT dbengrep_lpHostEnt;
#endif

#ifdef OS_UNIX
int dbengrep_clientSocket;
struct sockaddr_in dbengrep_sockClientAddr;
struct hostent *dbengrep_lpHostEnt;
#endif

char dbengrep_hostname[128];      /* host name of replication target */
int dbengrep_port;                /* port of replication target */
char dbengrep_from_tname[128];    /* table name of replication source */
int dbengrep_from_port;           /* port of replication source */

/* private functions */

static int dbengrep_recv_write(char *);
static int dbengrep_recv_delete(char *);
static int dbengrep_recv_pack(void);
static long dbengrep_recv_get_fpos(char *);
static int dbengrep_rll_add(struct dbeng_table *, char *, int);
static struct dbeng_rep *dbengrep_rll_find(struct dbeng_table *, int);
static int dbengrep_rll_delete(struct dbeng_table *, int);
static int dbengrep_sll_load(void);
static int dbengrep_sll_add(char *, int);
static int dbengrep_sll_delete(int);
static struct dbengrep_servers *dbengrep_sll_find(int);
static void dbengrep_sll_delete_all(void);
static int dbengrep_set_active(char *, int);
static int dbengrep_send_recv(char *);
static int dbengrep_connect(void);

int dbengrep_init(void)
{
   /* Initialize replication or update replication details.
      Function returns a 'dbeng' code. */

   struct dbeng_table *rov = dbeng_head;
   struct dbengcat_details *det;
   char mname[] = "dbengrep_init";
   int ret, rep_flag;

   logman("%s:enter", mname);

   /* replication must be active */

   (void)dbeng_config_get_replicate_flag(&rep_flag);

   if (!rep_flag)
      {
      dbeng_io_code_log(mname, "replication not active",
                        DBENG_REPLICATION_NOT_ACTIVE);
      return(DBENG_REPLICATION_NOT_ACTIVE);
      }

   /* load or re-load list of other Bbuuzzb servers (excluding this one) */

   if ((ret = dbengrep_sll_load()) != DBENG_OK)
      {
      dbeng_io_code_log(mname, "bad rc from dbengrep_sll_load", ret);
      return(ret);
      }

   /* if no tables in use, we are done */

   if (rov == DBENG_OT_NULL)
      {
      dbeng_io_code_log(mname, "no tables in use", DBENG_OK);
      return(DBENG_OK);
      }

   /* go through list of tables in use and load replication details
      for each table (if present) */

   while(rov != DBENG_OT_NULL)
      {
      // load catalog details for this table

      if ((ret = dbeng_catalog_init_table_details(rov->name, FALSE, &det)) !=
         DBENG_OK)
         {
         dbeng_io_code_log(mname, 
                           "bad rc from dbeng_catalog_init_table_details", ret);
         return(ret);
         }

      if ((ret = dbeng_catalog_table_details(det)) != DBENG_OK)
         {
         dbeng_io_code_log(mname, "bad rc from dbeng_catalog_table_details",
                           ret);
         return(ret);
         }

      if ((ret = dbengrep_rll_load(rov, det->rep_list)) != DBENG_OK)
         if (ret != DBENG_NO_REPLICATION_ENTRY)
            {
            logman("%s:exit:bad rc[%d] from dbengrep_rll_load[%d]",
                          mname, ret, rov->tid);
            return(ret);
            }

      if ((ret = dbeng_catalog_term_table_details(&det)) != DBENG_OK)
         {
         dbeng_io_code_log(mname, "bad rc from "
                           "dbeng_catalog_term_table_details", ret);
         return(ret);
         }

      rov = rov->next;
      }

   dbengrep_hostname[0] = dbengrep_from_tname[0] = EOS;
   dbengrep_port = dbengrep_from_port = 0;
   dbeng_io_code_log(mname, "normal exit", DBENG_OK);
   return(DBENG_OK);
}

int dbengrep_end(void)
{
   /* Clear all memory allocated for replication. Function returns
      a 'dbeng' code. */

   struct dbeng_table *rov = dbeng_head;
   char mname[] = "dbengrep_end";
   int ret;

   logman("%s:enter", mname);

   dbengrep_sll_delete_all();

   while(rov != DBENG_OT_NULL)
      {
      if ((ret = dbengrep_rll_delete_all(rov)) != DBENG_OK)
         logman("%s:exit:bad rc[%d] from dbengrep_rll_delete_all,"
                       "tid=%d", mname, ret, rov->tid);

      rov = rov->next;
      }

   dbengrep_hostname[0] = dbengrep_from_tname[0] = EOS;
   dbengrep_port = dbengrep_from_port = 0;
   dbeng_io_code_log(mname, "normal exit", DBENG_OK);
   return(DBENG_OK);
}

int dbengrep_recv(char *com)
{
   /* Process a received replication command. Received replication requests
      will only be processed if the replication flag is high. Function 
      returns a 'dbeng' code. */

   char mname[] = "dbengrep_recv", *tname, tmp[10];
   int rcode, ret, len, nwords, tid, session_flag, rep_flag;

   logman("%s:enter", mname);
   (void)dbeng_config_get_replicate_flag(&rep_flag);

   if (!rep_flag)
      {
      dbeng_io_code_log(mname, "replicate flag down", 
                        DBENG_REPLICATION_NOT_ACTIVE);
      return(DBENG_REPLICATION_NOT_ACTIVE);
      }

   if (com == (char *)NULL || !strlen(com))
      {
      dbeng_io_code_log(mname, "nul or empty[com]", DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   nwords = command_words(com);

   if (nwords < 4)
      {
      dbeng_io_code_log(mname, "improper number of parameters",
                        DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   /* get replication code (command word 1) */

   if (!command_word(com, tmp, 1))
      {
      dbeng_io_code_log(mname, "bad rc from command_word[tmp(rcode)]",
                        DBENG_INTERNAL_ERROR);
      return(DBENG_INTERNAL_ERROR);
      }

   if (!qatoi(tmp, &rcode))
      {
      dbeng_io_code_log(mname, "non numeric[rcode]", DBENG_INTERNAL_ERROR);
      return(DBENG_INTERNAL_ERROR);
      }

   /* get from port (command word 2) */

   if (!command_word(com, tmp, 2))
      {
      dbeng_io_code_log(mname, "bad rc from command_word[tmp"
                        "(dbengrep_from_port)]", DBENG_INTERNAL_ERROR);
      return(DBENG_INTERNAL_ERROR);
      }

   if (!qatoi(tmp, &dbengrep_from_port))
      {
      dbeng_io_code_log(mname, "non numeric[dbengrep_from_port]",
                        DBENG_INTERNAL_ERROR);
      return(DBENG_INTERNAL_ERROR);
      }

   /* get from table name (command word 3) */

   if (!command_word(com, dbengrep_from_tname, 3))
      {
      dbeng_io_code_log(mname, "bad rc from command_word[dbengrep_from_tname]",
                        DBENG_INTERNAL_ERROR);
      return(DBENG_INTERNAL_ERROR);
      }

   /* get destination table name (command word 4) */

   len = command_wordlen(com, 4);

   if (len <= 0)
      {
      dbeng_io_code_log(mname, "bad rc from command_wordlen[tname]",
                        DBENG_INTERNAL_ERROR);
      return(DBENG_INTERNAL_ERROR);
      }

   if ((tname = (char *)malloc(len + 1)) == (char *)NULL)
      {
      dbeng_io_code_log(mname, "alloc fail[tname]", DBENG_MEMORY_FAIL);
      return(DBENG_MEMORY_FAIL);
      }

   if (!command_word(com, tname, 4))
      {
      dbeng_io_code_log(mname, "bad rc from command_word[tname]",
                        DBENG_INTERNAL_ERROR);
      free(tname);
      return(DBENG_INTERNAL_ERROR);
      }

   /* open the destination table */

   if ((ret = dbeng_open_table(tname, &tid)) != DBENG_OK)
      {
      dbeng_io_code_log(mname, "bad rc from dbeng_open_table", ret);
      free(tname);
      return(ret);
      }

   free(tname);

   /* get table pointer from tid */

   if ((rep_table = dbeng_atid_lookup(tid)) == DBENG_OT_NULL)
      {
      dbeng_io_code_log(mname, "bad rc[NULL] from dbeng_atid_lookup",
                        DBENG_INTERNAL_ERROR);
      return(DBENG_INTERNAL_ERROR);
      }

   /* if session table is active, the physical file will have been
      closed by 'dbeng_open_table', re-open it */

   (void)dbeng_config_get_session_flag(&session_flag);

   if (session_flag)
      if ((ret = dbeng_ll_open(rep_table)) != DBENG_OK)
         {
         dbeng_io_code_log(mname, "bad rc from dbeng_ll_open", ret);
         return(ret);
         }

   /* process command */

   switch(rcode)
      {
      case DBENG_SEND_REPLICATE_WRITE:
         ret = dbengrep_recv_write(com);
         break;

      case DBENG_SEND_REPLICATE_DELETE:
         ret = dbengrep_recv_delete(com);
         break;

      case DBENG_SEND_REPLICATE_PACK:
         ret = dbengrep_recv_pack();
         break;

      default:
         logman("%s:exit[%d]:unknown replication command", mname,
                       DBENG_INTERNAL_ERROR);
         (void)dbeng_close_table(rep_table);
         return(DBENG_INTERNAL_ERROR);
      };

   if (ret != DBENG_OK)
      (void)dbeng_close_table(rep_table);
   else
      ret = dbeng_close_table(rep_table);

   dbeng_io_code_log(mname, "normal exit", ret);
   return(ret);
}

int dbengrep_send(struct dbeng_table *ot, int rep_code, char *irec)
{
   /* Send a replication command to every listed available
      server. 'rep_code' is the replication code and 'irec'
      is the input record contents which is only used in the
      case of a 'write' replication command. Function returns 
      a 'dbeng' code. */

   struct dbeng_rep *rov;
   struct dbengrep_servers *bserver;
   char mname[] = "dbengrep_send", *qftname, *qdtname, *qrec, *com;
   int ret, len, rep_flag, entry_changed = FALSE;

   /* do not send any replication commands if the flag is down */

   (void)dbeng_config_get_replicate_flag(&rep_flag);

   if (!rep_flag)
      {
      dbeng_io_code_log(mname, "replication not active",
                        DBENG_REPLICATION_NOT_ACTIVE);
      return(DBENG_REPLICATION_NOT_ACTIVE);
      }

   if (ot == DBENG_OT_NULL)
      {
      dbeng_io_code_log(mname, "null[ot]", DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   rov = ot->rep_list;

   /* if replication list is empty, return with ok */

   if (rov == DBENG_ROT_NULL)
      {
      dbeng_io_code_log(mname, "exit:replication list empty", DBENG_OK);
      return(DBENG_OK);
      }

   logman("%s:enter:tid=%d,rep=%d", mname, ot->tid, rep_code);

   /* loop through replication list, sending a replication command
      to each listed server */

   while(rov != DBENG_ROT_NULL)
      {
      /* if we are attempting to send to the same server as the
         last received replication request, refuse */

      if (rov->port == dbengrep_from_port &&
          !strcmp(rov->tname, dbengrep_from_tname))
         {
         logman("%s:send[%d,%s] denied:same table as last rep recv",
                       mname, rov->port, rov->tname);
         rov->port = 0;
         entry_changed = TRUE;
         rov = rov->next;
         continue;
         }

      /* get server host name from 'dbengrep_servers' link list */

      if ((bserver = dbengrep_sll_find(rov->port)) == DBENGREP_SOT_NULL)
         {
         logman("%s:server on port %d is not in server list", mname,
                       rov->port);
         
         /* if server is missing from servers list mark the entry */

         rov->port = 0;
         entry_changed = TRUE;
         rov = rov->next;
         continue;
         }

      /* load host name and port number of selected server */

      if ((ret = dbengrep_set_active(bserver->host, rov->port)) != DBENG_OK)
         {
         logman("%s:skip:bad rc from dbengrep_set_active[%s,%d]",
                       mname, bserver->host, rov->port);
         rov = rov->next;
         continue;
         }

      logman("%s:rep[%d]->%s[%d]", mname, rep_code, dbengrep_hostname,
                    dbengrep_port);

      /* calculate length of command string */

      len = strlen(ot->name) + 3 + strlen(rov->tname) + 3 + 9;

      switch(rep_code)
         {
         case DBENG_SEND_REPLICATE_WRITE:
            if (irec == (char *)NULL || !strlen(irec))
               {
               dbeng_io_code_log(mname, "null or empty[irec]",
                                 DBENG_INVALID_FUNCTION);
               return(DBENG_INVALID_FUNCTION);
               }

            len += strlen(irec) + 3;
            break;

         case DBENG_SEND_REPLICATE_DELETE:
            len += 11;
            break;

         case DBENG_SEND_REPLICATE_PACK:
            break;

         default:
            logman("%s:exit:invalid rep_code[%d]", mname, rep_code);
            return(DBENG_INVALID_FUNCTION);
         };

      if ((com = (char *)malloc(len)) == (char *)NULL)
         {
         logman("%s:exit:rep[%d]->%s[%d]:alloc fail[com]", mname, 
                       rep_code, dbengrep_hostname, dbengrep_port);
         return(DBENG_MEMORY_FAIL);
         }

      /* build command string parts */

      if ((qftname = initqstring(ot->name)) == (char *)NULL)
         {
         logman("%s:exit:rep[%d]->%s[%d]:alloc fail[qftname]", mname, 
                       rep_code, dbengrep_hostname, dbengrep_port);
         free(com);
         return(DBENG_MEMORY_FAIL);
         }
        
      if ((qdtname = initqstring(rov->tname)) == (char *)NULL)
         {
         logman("%s:exit:rep[%d]->%s[%d]:alloc fail[qdtname]", mname, 
                       rep_code, dbengrep_hostname, dbengrep_port);
         free(com);
         free(qftname);
         return(DBENG_MEMORY_FAIL);
         }

      /* build command string */

      switch(rep_code)
         {
         case DBENG_SEND_REPLICATE_WRITE:
            if ((qrec = initqstring(irec)) == (char *)NULL)
               {
               logman("%s:exit:rep[%d]->%s[%d]:alloc fail[irec]", 
                             mname, rep_code, dbengrep_hostname, dbengrep_port);
               free(com);
               free(qftname);
               free(qdtname);
               return(DBENG_MEMORY_FAIL);
               }
            
            sprintf(com, "%d %d %s %s %s", rep_code, server_port, 
                    qftname, qdtname, qrec);
            free(qrec);
            break;

         case DBENG_SEND_REPLICATE_DELETE:
            sprintf(com, "%d %d %s %s %ld", rep_code, server_port, 
                    qftname, qdtname, ot->orig_position);
            break;

         case DBENG_SEND_REPLICATE_PACK:
            sprintf(com, "%d %d %s %s", rep_code, server_port, 
                    qftname, qdtname);
            break;
         };

      free(qftname);
      free(qdtname);

      /* send replication command and wait for reply, do not exit
         this function if command fails */

      if ((ret = dbengrep_send_recv(com)) != DBENG_OK)
         logman("%s:replication command failed,rc=%d", mname, ret);

      free(com);
      rov = rov->next;
      }

   /* if an error was found in the replication list, re-load */

   if (entry_changed)
      {
      logman("%s:rep entry invalid:re-loading rep list", mname);
      dbengrep_init();
      }

   dbeng_io_code_log(mname, "normal exit", DBENG_OK);
   return(DBENG_OK);
}

int dbengrep_rll_delete_all(struct dbeng_table *ot)
{
   /* Delete any replication link list nodes for a table. Function
      returns a 'dbeng' code. */

   struct dbeng_rep *rov, *tmp;
   char mname[] = "dbengrep_rll_delete_all";

   if (ot == DBENG_OT_NULL)
      {
      dbeng_io_code_log(mname, "null[ot]", DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   logman("%s:enter:tid=%d", mname, ot->tid);
   rov = ot->rep_list;

   while(rov != DBENG_ROT_NULL)
      {
      if (rov->tname != (char *)NULL)
         free(rov->tname);

      tmp = rov->next;
      free(rov);
      rov = tmp;
      }

   ot->rep_list = DBENG_ROT_NULL;
   dbeng_io_code_log(mname, "normal exit", DBENG_OK);
   return(DBENG_OK);
}

int dbengrep_rll_load(struct dbeng_table *ot, char *thelist)
{
   /* Load or re-load the replication details into the
      replication link list. Function returns a 'dbeng' code. */

   struct dbengrep_servers *bserver;
   char mname[] = "dbengrep_rll_load", *entry, *tname, port_char[10];
   int ret, nentries, i, port, len;

   if (ot == DBENG_OT_NULL)
      {
      dbeng_io_code_log(mname, "null[ot]", DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   if (thelist == (char *)NULL)
      {
      dbeng_io_code_log(mname, "null[thelist]", DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   logman("%s:enter:tid=%d", mname, ot->tid);

   /* clear any list members if present */

   if ((ret = dbengrep_rll_delete_all(ot)) != DBENG_OK)
      {
      dbeng_io_code_log(mname, "bad rc from dbengrep_rll_delete_all", ret);
      return(ret);
      }

   len = strlen(thelist);

   if (!len)
      {
      dbeng_io_code_log(mname, "no rep entries", DBENG_NO_REPLICATION_ENTRY);
      return(DBENG_NO_REPLICATION_ENTRY);
      }

   nentries = ll_words(thelist, DBENG_LIST_DELIM);
   logman("%s:got %d replication entries", mname, nentries);

   if (nentries <= 0)
      {
      dbeng_io_code_log(mname, "bad nentires", DBENG_INTERNAL_ERROR);
      return(DBENG_INTERNAL_ERROR);
      }

   if ((entry = (char *)malloc(len + 1)) == (char *)NULL)
      {
      dbeng_io_code_log(mname, "alloc fail[entry]", DBENG_MEMORY_FAIL);
      return(DBENG_MEMORY_FAIL);
      }

   if ((tname = (char *)malloc(len + 1)) == (char *)NULL)
      {
      dbeng_io_code_log(mname, "alloc fail[tname]", DBENG_MEMORY_FAIL);
      free(entry);
      return(DBENG_MEMORY_FAIL);
      }

   /* loop through list matching the port number given in the catalog
      to the list of available Bbuuzzb servers, if there is a match,
      add the entry to the replication link list */

   for(i = 1; i <= nentries; i++)
      {
      if (!ll_word(thelist, entry, i, DBENG_LIST_DELIM))
         {
         logman("%s:exit:bad rc[FALSE] from ll_word[%d]", mname, i);
         free(entry);
         free(tname);
         return(DBENG_INTERNAL_ERROR);
         }

      if (!ll_word(entry, port_char, 1, ','))
         {
         logman("%s:exit:bad rc[FALSE] from ll_word[%d,port_char]",
                       mname, i);
         free(entry);
         free(tname);
         return(DBENG_INTERNAL_ERROR);
         }

      if (!qatoi(port_char, &port))
         {
         logman("%s:exit:bad rc[FALSE] from qatoi[%d,port]", mname, i);
         free(entry);
         free(tname);
         return(DBENG_INTERNAL_ERROR);
         }

      if ((bserver = dbengrep_sll_find(port)) != DBENGREP_SOT_NULL)
         {
         if (!ll_word(entry, tname, 2, ','))
            {
            logman("%s:exit:bad rc[FALSE] from ll_word[%d,tname]",
                          mname, i);
            free(entry);
            free(tname);
            return(DBENG_INTERNAL_ERROR);
            }

         logman("%s:found match:p=%d,t=%s", mname, port, tname);

         if ((ret = dbengrep_rll_add(ot, tname, port)) != DBENG_OK)
            {
            logman("%s:exit:bad rc[%d] from dbengrep_rll_add[%d]",
                          mname, ret, i);
            free(entry);
            free(tname);
            return(ret);
            }
         }
      }

   free(entry);
   free(tname);
   dbeng_io_code_log(mname, "normal exit", DBENG_OK);
   return(DBENG_OK);
}

/* private functions */

/* replication receive and process command functions */

static int dbengrep_recv_write(char *com)
{
   char mname[] = "dbengrep_recv_write", *irec;
   int ret, nwords;

   logman("%s:enter", mname);

   if (com == (char *)NULL || !strlen(com))
      {
      dbeng_io_code_log(mname, "null[com]", DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   nwords = command_words(com);

   if (nwords < 5)
      {
      dbeng_io_code_log(mname, "exit:improper number of command words",
                        DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   /* get record contents (command word 5) */

   if ((irec = (char *)malloc(strlen(com) + 1)) == (char *)NULL)
      {
      dbeng_io_code_log(mname, "alloc fail[irec]", DBENG_MEMORY_FAIL);
      return(DBENG_MEMORY_FAIL);
      }

   if (!command_word(com, irec, 5))
      {
      dbeng_io_code_log(mname, "exit:bad rc[FALSE] from command_word[irec]",
                        DBENG_INTERNAL_ERROR);
      free(irec);
      return(DBENG_INTERNAL_ERROR);
      }

   /* write a new record */

   if ((ret = dbeng_write_string_recd(rep_table, irec)) != DBENG_OK)
      {
      dbeng_io_code_log(mname, "exit:bad rc from dbeng_write_string_recd", ret);
      free(irec);
      return(ret);
      }

   free(irec);
   dbeng_io_code_log(mname, "normal exit", ret);
   return(ret);
}

static int dbengrep_recv_delete(char *com)
{
   char mname[] = "dbengrep_recv_delete";
   long fpos;
   int ret, nwords;

   logman("%s:enter", mname);

   if (com == (char *)NULL || !strlen(com))
      {
      dbeng_io_code_log(mname, "null[com]", DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   nwords = command_words(com);

   if (nwords < 5)
      {
      dbeng_io_code_log(mname, "exit:improper number of command words",
                        DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   /* get file position (command word 5) */

   if ((fpos = dbengrep_recv_get_fpos(com)) == -1L)
      {
      dbeng_io_code_log(mname, "exit:bad rc[-1] from dbengep_recv_get_fpos",
                        DBENG_INTERNAL_ERROR);
      return(DBENG_INTERNAL_ERROR);
      }

   /* set file position */

   if ((ret = dbeng_set_pos(rep_table, fpos)) != DBENG_OK)
      {
      dbeng_io_code_log(mname, "exit:bad rc from dbeng_set_pos", ret);
      return(ret);
      }

   /* get the record */

   if ((ret = dbeng_get_recd(rep_table)) != DBENG_OK)
      {
      dbeng_io_code_log(mname, "exit:bad rc from dbeng_get_recd", ret);
      return(ret);
      }

   /* delete the current record */

   if ((ret = dbeng_delete_recd(rep_table)) != DBENG_OK)
      {
      dbeng_io_code_log(mname, "exit:bad rc from dbeng_delete_recd", ret);
      return(ret);
      }

   dbeng_io_code_log(mname, "normal exit", ret);
   return(ret);
}

static int dbengrep_recv_pack(void)
{
   char mname[] = "dbengrep_recv_pack";
   int ret;

   logman("%s:enter", mname);

   if ((ret = dbeng_pack_table(rep_table)) != DBENG_OK)
      {
      dbeng_io_code_log(mname, "exit:bad rc from dbeng_pack_table", ret);
      return(ret);
      }

   dbeng_io_code_log(mname, "normal exit", ret);
   return(ret);
}

static long dbengrep_recv_get_fpos(char *com)
{
   /* Obtain the file position from the received replication
      request which is always in command word six. Function
      returns the file position upon success, -1 otherwise. */

   char mname[] = "dbengrep_recv_get_fpos", fpos_char[25];
   long fpos;
   int nwords;

   if (com == (char *)NULL || !strlen(com))
      return(-1L);

   nwords = command_words(com);

   if (nwords < 5)
      return(-1L);

   if (!command_word(com, fpos_char, 5))
      return(-1L);

   if (!qatol(fpos_char, &fpos))
      return(-1L);

   return(fpos);
}

/* replication link list functions */

static int dbengrep_rll_add(struct dbeng_table *ot, char *tname, int port_num)
{
   /* Add a replication entry. Function returns
      DBENG'_OK' upon success, a 'dbeng' code
      otherwise. */

   struct dbeng_rep *dbengrep_entry;
   struct dbeng_rep *rov;
   char mname[] = "dbengrep_rll_add";
   int ret, len, size;

   size = sizeof(struct dbeng_rep);

   /* reasonableness checks */

   if (ot == DBENG_OT_NULL)
      {
      dbeng_io_code_log(mname, "null[ot]", DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   if (tname == (char *)NULL || !strlen(tname))
      {
      dbeng_io_code_log(mname, "null or empty[tname]", DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   if (port_num <= 0)
      {
      dbeng_io_code_log(mname, "out of range[port_num]",
                        DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   len = strlen(tname);
   logman("%s:enter:tid=%d,tname=%s,l=%d,port_num=%d", mname, ot->tid,
                 tname, len, port_num);

   if ((dbengrep_entry = (struct dbeng_rep *)malloc(size))
       == DBENG_ROT_NULL)
      {
      dbeng_io_code_log(mname, "alloc fail[dbengrep_entry]",
                        DBENG_MEMORY_FAIL);
      return(DBENG_MEMORY_FAIL);
      }

   if ((dbengrep_entry->tname = (char *)malloc(len + 1)) == (char *)NULL)
      {
      dbeng_io_code_log(mname, "alloc fail[dbengrep_entry->tname]",
                        DBENG_MEMORY_FAIL);
      free(dbengrep_entry);
      return(DBENG_MEMORY_FAIL);
      }

   strcpy(dbengrep_entry->tname, tname);
   dbengrep_entry->port = port_num;
   dbengrep_entry->next = DBENG_ROT_NULL;

   /* set head of list if this is the first entry */

   if (ot->rep_list == DBENG_ROT_NULL)
      {
      ot->rep_list = dbengrep_entry;
      dbeng_io_code_log(mname, "normal exit at head of list", DBENG_OK);
      return(DBENG_OK);
      }

   /* find last entry and hook into it */

   rov = ot->rep_list;

   while(rov->next != DBENG_ROT_NULL)
      rov = rov->next;

   rov->next = dbengrep_entry;
   dbeng_io_code_log(mname, "normal exit", DBENG_OK);
   return(DBENG_OK);
}

static struct dbeng_rep *dbengrep_rll_find(struct dbeng_table *ot,
                                           int port_num)
{
   /* Locate a replication node with the port 'port_num' within
      the table 'ot'. Function returns a pointer to the replication
      node upon success, a null pointer otherwise. */

   struct dbeng_rep *rov;
   char mname[] = "dbengrep_rll_find";

   if (ot == DBENG_OT_NULL)
      {
      logman("%s:exit:null[ot]", mname);
      return(DBENG_ROT_NULL);
      }

   logman("%s:enter:tid=%d,port=%d", mname, ot->tid, port_num);
   rov = ot->rep_list;

   while(rov != DBENG_ROT_NULL)
      {
      if (rov->port == port_num)
         {
         logman("%s:exit:found match", mname);
         return(rov);
         }

      rov = rov->next;
      }

   logman("%s:exit:no match", mname);
   return(DBENG_ROT_NULL);
}

static int dbengrep_rll_delete(struct dbeng_table *ot, int port_num)
{
   /* Delete a replication entry.
      Function returns 'DBENG_OK' upon success, a
      'dbeng' code otherwise. */

   struct dbeng_rep *rov;
   struct dbeng_rep *prev;
   char mname[] = "dbengrep_rll_delete";

   if (ot == DBENG_OT_NULL)
      {
      dbeng_io_code_log(mname, "null[ot]", DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   logman("%s:enter:tid=%d,port=%d", mname, ot->tid, port_num);
   rov = prev = ot->rep_list;

   while(rov != DBENG_ROT_NULL)
      {
      if (port_num == rov->port)
         break;
      else
         {
         prev = rov;
         rov = rov->next;
         }
      }

   /* if no match, error */

   if (rov == DBENG_ROT_NULL)
      {
      dbeng_io_code_log(mname, "entry not found", DBENG_NOT_EXIST);
      return(DBENG_NOT_EXIST);
      }

   /* if we are deleting the head entry, set new head */

   if (rov == ot->rep_list)
      ot->rep_list = rov->next;
   else
      prev->next = rov->next;

   if (rov->tname != (char *)NULL)
      free(rov->tname);

   free(rov);
   dbeng_io_code_log(mname, "normal exit", DBENG_OK);
   return(DBENG_OK);
}

/* available Bbuuzzb server link list functions */

static int dbengrep_sll_load(void)
{
   /* Load the list of available 'Bbuuzzb' servers from 'socloc'
      into the link list. This server denoted by
      the port 'server_port' will be excluded from the list. */

   char *thelist, mname[] = "dbengrep_sll_load", mes[128];
   char *entry, *host, port_char[25];
   int ret, nwords, i, len, port;

   logman("%s:enter", mname);

   /* make sure 'socloc' has already been initialized */

   if ((ret = sloc_is_init()) != SL_OK)
      {
      sl_code_string(ret, mes);
      logman("%s:bad rc[%d,%s][sloc_is_init]", mname,
                    ret, mes);
      return(DBENG_SOCLOC_NO_INIT);
      }

   /* if the link list is not empty, clear it */

   if (dbengrep_shead != DBENGREP_SOT_NULL)
      dbengrep_sll_delete_all();

   if ((thelist = (char *)malloc(SL_MAXCOMMAND)) == (char *)NULL)
      {
      dbeng_io_code_log(mname, "alloc fail[thelist]", DBENG_MEMORY_FAIL);
      return(DBENG_MEMORY_FAIL);
      }

   /* get list of all 'Bbuuzzb' servers from 'socloc' */

   if ((ret = sloc_find_list(DBENG_SERVICE_NAME, thelist)) != SL_OK)
      {
      free(thelist);
      sl_code_string(ret, mes);
      logman("%s:bad rc[%s] from sloc_find_list", mname, mes);
      return(DBENG_NO_SERVER);
      }

   len = strlen(thelist);
   nwords = ll_words(thelist, SL_LIST_DELIM);

   if (nwords < 1)
      {
      free(thelist);
      dbeng_io_code_log(mname, "socloc server list empty", DBENG_NO_SERVER);
      return(DBENG_NO_SERVER);
      }

   if ((entry = (char *)malloc(len + 1)) == (char *)NULL)
      {
      free(thelist);
      dbeng_io_code_log(mname, "alloc fail[entry]", DBENG_MEMORY_FAIL);
      return(DBENG_MEMORY_FAIL);
      }

   /* go through list and add each server to link list (except this server) */

   for(i = 1; i <= nwords; i++)
      {
      if (!ll_word(thelist, entry, i, SL_LIST_DELIM))
         {
         free(thelist);
         free(entry);
         logman("%s:bad rc[FALSE] from ll_word[entry=%d]", mname, i);
         return(DBENG_INTERNAL_ERROR);
         }

      if (!command_word(entry, port_char, 3))
         {
         free(thelist);
         free(entry);
         logman("%s:bad rc[FALSE] from command_word[port_char,entry=%d]",
                       mname, i);
         return(DBENG_INTERNAL_ERROR);
         }

      if (!qatoi(port_char, &port))
         {
         free(thelist);
         free(entry);
         logman("%s:bad rc[FALSE] from qatoi[port_char,entry=%d]",
                       mname, i);
         return(DBENG_INTERNAL_ERROR);
         }

      /* do not add this server to the link list */

      if (port != server_port)
         {
         len = strlen(entry);

         if ((host = (char *)malloc(len + 1)) == (char *)NULL)
            {
            free(thelist);
            free(entry);
            dbeng_io_code_log(mname, "alloc fail[host]", DBENG_MEMORY_FAIL);
            return(DBENG_MEMORY_FAIL);
            }

         if (!command_word(entry, host, 2))
            {
            free(thelist);
            free(entry);
            free(host);
            logman("%s:bad rc[FALSE] from command_word[host,entry=%d",
                          mname, i);
            return(DBENG_INTERNAL_ERROR);
            }

         if ((ret = dbengrep_sll_add(host, port)) != DBENG_OK)
            {
            free(thelist);
            free(entry);
            free(host);
            dbeng_io_code_log(mname, "bad rc from dbengrep_ll_add", ret);
            return(ret);
            }

         free(host);
         }
      }

   free(entry);
   free(thelist);

   if (dbengrep_shead == DBENGREP_SOT_NULL)
      {
      dbeng_io_code_log(mname, "exit:no other Bbuuzzb servers",
                        DBENG_NO_SERVER);
      return(DBENG_NO_SERVER);
      }

   dbeng_io_code_log(mname, "normal exit", DBENG_OK);
   return(DBENG_OK);
}

static int dbengrep_sll_add(char *hname, int port_num)
{
   /* Add a Bbuuzzb server entry. Function returns
      DBENG'_OK' upon success, a 'dbeng' code
      otherwise. */

   struct dbengrep_servers *dbengrep_entry;
   struct dbengrep_servers *rov;
   char mname[] = "dbengrep_sll_add";
   int ret, len, size;

   size = sizeof(struct dbengrep_servers);

   /* reasonableness checks */

   if (hname == (char *)NULL || !strlen(hname))
      {
      dbeng_io_code_log(mname, "invalid parameter[hname]",
                        DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   if (port_num <= 0)
      {
      dbeng_io_code_log(mname, "port out of range", DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   logman("%s:enter:hname=%s,port_num=%d", mname, hname, port_num);

   if ((dbengrep_entry = (struct dbengrep_servers *)malloc(size))
       == DBENGREP_SOT_NULL)
      {
      dbeng_io_code_log(mname, "alloc fail[dbengrep_entry]",
                        DBENG_MEMORY_FAIL);
      return(DBENG_MEMORY_FAIL);
      }

   len = strlen(hname);

   if ((dbengrep_entry->host = (char *)malloc(len + 1)) == (char *)NULL)
      {
      dbeng_io_code_log(mname, "alloc fail[dbengrep_entry->host]",
                        DBENG_MEMORY_FAIL);
      free(dbengrep_entry);
      return(DBENG_MEMORY_FAIL);
      }

   strcpy(dbengrep_entry->host, hname);
   dbengrep_entry->port = port_num;
   dbengrep_entry->next = DBENGREP_SOT_NULL;

   /* set head of list if this is the first entry */

   if (dbengrep_shead == DBENGREP_SOT_NULL)
      {
      dbengrep_shead = dbengrep_entry;
      dbeng_io_code_log(mname, "normal exit at head of list", DBENG_OK);
      return(DBENG_OK);
      }

   /* find last entry and hook into it */

   rov = dbengrep_shead;

   while(rov->next != DBENGREP_SOT_NULL)
      rov = rov->next;

   rov->next = dbengrep_entry;
   dbeng_io_code_log(mname, "normal exit", DBENG_OK);
   return(DBENG_OK);
}

static int dbengrep_sll_delete(int port_num)
{
   /* Delete a Bbuuzzb server entry. Since the TCP/IP
      port is unique, this is used as the entry key.
      Function returns 'DBENG_OK' upon success, a
      'dbeng' code otherwise. */

   struct dbengrep_servers *rov;
   struct dbengrep_servers *prev;
   char mname[] = "dbengrep_sll_delete";

   logman("%s:enter,port=%d", mname, port_num);
   rov = prev = dbengrep_shead;

   while(rov != DBENGREP_SOT_NULL)
      {
      if (port_num == rov->port)
         break;
      else
         {
         prev = rov;
         rov = rov->next;
         }
      }

   /* if no match, error */

   if (rov == DBENGREP_SOT_NULL)
      {
      dbeng_io_code_log(mname, "entry not found", DBENG_NOT_EXIST);
      return(DBENG_NOT_EXIST);
      }

   /* if we are deleting the head entry, set new head */

   if (rov == dbengrep_shead)
      dbengrep_shead = rov->next;
   else
      prev->next = rov->next;

   free(rov->host);
   free(rov);
   dbeng_io_code_log(mname, "normal exit", DBENG_OK);
   return(DBENG_OK);
}

static struct dbengrep_servers *dbengrep_sll_find(int port_num)
{
   /* Locate an available Bbuuzzb server (not this one). Function
      returns a pointer to the available server upon success,
      'DBENGREP_SOT_NULL' otherwise. */

   struct dbengrep_servers *rov = dbengrep_shead;
   char mname[] = "dbengrep_sll_find";

   logman("%s:enter:port_num=%d", mname, port_num);

   if (port_num <= 0)
      {
      logman("%s:exit:out of range[port_num]", mname);
      return(DBENGREP_SOT_NULL);
      }

   while(rov != DBENGREP_SOT_NULL)
      {
      if (rov->port == port_num)
         {
         logman("%s:normal exit:found match", mname);
         return(rov);
         }

      rov = rov->next;
      }

   logman("%s:normal exit:no match found", mname);
   return(DBENGREP_SOT_NULL);
}

static void dbengrep_sll_delete_all(void)
{
   /* Deallocate all list members and set link list head
      pointer to NULL. */

   struct dbengrep_servers *rov = dbengrep_shead, *tmp;
   char mname[] = "dbengrep_sll_delete_all";

   logman("%s:enter", mname);

   if (dbengrep_shead == DBENGREP_SOT_NULL)
      {
      logman("%s:exit:list empty", mname);
      return;
      }

   while(rov != DBENGREP_SOT_NULL)
      {
      free(rov->host);
      tmp = rov->next;
      free(rov);
      rov = tmp;
      }

   dbengrep_shead = DBENGREP_SOT_NULL;
   logman("%s:normal exit", mname);
}

static int dbengrep_set_active(char *hname, int port)
{
   /* Set the active destination Bbuuzzb server host name and
      port number. Function returns a 'dbeng' code. */

   char mname[] = "dbengrep_set_active";
   
   logman("%s:enter", mname);

   if (hname == (char *)NULL || !strlen(hname))
      {
      dbeng_io_code_log(mname, "null or empty[hname]", DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   if (port <= 0)
      {
      dbeng_io_code_log(mname, "out of range[port]", DBENG_INVALID_FUNCTION);
      return(DBENG_INVALID_FUNCTION);
      }

   strcpy(dbengrep_hostname, hname);
   dbengrep_port = port;
   dbeng_io_code_log(mname, "normal exit", DBENG_OK);
   return(DBENG_OK);
}

static int dbengrep_send_recv(char *com)
{
   /* Send and receive a message to the Bbuuzzb server that returns no
      other value except for the general io return code. Function
      returns a DBENG io code. */

   char *mess;
   int i, len;

   if (!dbengrep_port)
      return(DBENG_NO_SERVER);

   if (com == (char *)NULL || !strlen(com))
      return(DBENG_INVALID_FUNCTION);

   // connect to server

   if (!dbengrep_connect())
      return(DBENG_VC_ERROR);

   len = strlen(com) + 5;

   if ((mess = (char *)malloc(len)) == (char *)NULL)
      {
#ifdef OS_WIN32
      (void)closesocket(dbengrep_clientSocket);
#else
      close(dbengrep_clientSocket);
#endif

      return(DBENG_MEMORY_FAIL);
      }

   memset(mess, 0, len);
   sprintf(mess, "%d %s", DBENG_SEND_REPLICATE, com);

   // send message

   if (!ipc_send(dbengrep_clientSocket, mess))
      {
#ifdef OS_WIN32
      (void)closesocket(dbengrep_clientSocket);
#else
      close(dbengrep_clientSocket);
#endif

      free(mess);
      return(DBENG_VC_ERROR);
      }

   memset(mess, 0, len);

   // receive return code

   if (!ipc_recv(dbengrep_clientSocket, mess))
      {
#ifdef OS_WIN32
      (void)closesocket(dbengrep_clientSocket);
#else
      close(dbengrep_clientSocket);
#endif

      free(mess);
      return(DBENG_VC_ERROR);
      }

   // make sure its a number

   if (!qatoi(mess, &i))
      {
#ifdef OS_WIN32
      (void)closesocket(dbengrep_clientSocket);
#else
      close(dbengrep_clientSocket);
#endif

      free(mess);
      return(DBENG_INTERNAL_ERROR);
      }

#ifdef OS_WIN32
   (void)closesocket(dbengrep_clientSocket);
#else
   close(dbengrep_clientSocket);
#endif

   free(mess);
   return(i);
}

static int dbengrep_connect(void)
{
   /* Connect to a Bbuuzzb server. An attempt to open the TCP
      socket is made. The host name 'dbengrep_hostname' and the
      TCP port 'dbengrep_port' are assumed to be already loaded.
      Function returns 'TRUE' if a successful connection was made,
      'FALSE' otherwise. */

   // make sure host name and port are loaded

   if (!dbengrep_port || !strlen(dbengrep_hostname))
      return(FALSE);

   // resolve server host name

   dbengrep_lpHostEnt = gethostbyname(dbengrep_hostname);

   if (!dbengrep_lpHostEnt)
      return(FALSE);

   // create the socket

#ifdef OS_WIN32
   dbengrep_clientSocket = socket(PF_INET, SOCK_STREAM, DEFAULT_PROTOCOL);
#endif

#ifdef OS_UNIX
   dbengrep_clientSocket = socket(AF_INET, SOCK_STREAM, DEFAULT_PROTOCOL);
#endif

   if (dbengrep_clientSocket == INVALID_SOCKET)
      return(FALSE);

   // load client address data

   memset(&dbengrep_sockClientAddr, 0, sizeof(dbengrep_sockClientAddr));
   dbengrep_sockClientAddr.sin_family = AF_INET;
   dbengrep_sockClientAddr.sin_port = htons(dbengrep_port);

#ifdef OS_WIN32
   dbengrep_sockClientAddr.sin_addr =
     *((LPIN_ADDR)*dbengrep_lpHostEnt->h_addr_list);
#endif

#ifdef OS_UNIX
   dbengrep_sockClientAddr.sin_addr.s_addr =
      ((struct in_addr *)(dbengrep_lpHostEnt->h_addr))->s_addr;
#endif

   // connect to server

#ifdef OS_WIN32
   if (connect(dbengrep_clientSocket, (LPSOCKADDR)&dbengrep_sockClientAddr,
       sizeof(dbengrep_sockClientAddr)))
#endif

#ifdef OS_UNIX
   if (connect(dbengrep_clientSocket, (SA *)&dbengrep_sockClientAddr,
      sizeof(dbengrep_sockClientAddr)) == SOCKET_ERROR)
#endif
      return(FALSE);

   return(TRUE);
}

/* [<][>][^][v][top][bottom][index][help] */