/* [<][>][^][v][top][bottom][index][help] */
DEFINITIONS
This source file includes following definitions.
- dbengrep_init
- dbengrep_end
- dbengrep_recv
- dbengrep_send
- dbengrep_rll_delete_all
- dbengrep_rll_load
- dbengrep_recv_write
- dbengrep_recv_delete
- dbengrep_recv_pack
- dbengrep_recv_get_fpos
- dbengrep_rll_add
- dbengrep_rll_find
- dbengrep_rll_delete
- dbengrep_sll_load
- dbengrep_sll_add
- dbengrep_sll_delete
- dbengrep_sll_find
- dbengrep_sll_delete_all
- dbengrep_set_active
- dbengrep_send_recv
- dbengrep_connect
/* Bbuuzzb replication management module.
Rick Smereka, Copyright (C) 2002-2004.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, get a copy via the Internet at
http://gnu.org/copyleft/gpl.html or write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston,
MA 02111-1307 USA
You can contact the author via email at rsmereka@future-lab.com
This module should only be linked with the Bbuuzzb database server.
Currently, replication is only available for the TCP/IP IPC method. This
module interfaces with the 'socloc' API to obtain the details of
other running Bbuuzzb servers.
This module manages two link lists. The first is 'dbengrep_servers'
which contains a list of currently available 'Bbuuzzb' servers
(not including this one). The second link list is 'dbeng_rep'
which is contained within each table in use ('dbeng_table').
When 'dbengrep_init' is called, the 'dbengep_servers' link
list will be loaded and each current table in use will have
its replication details loaded (from the catalog) into
the 'dbeng_rep' link list. Once the replication initialization
is complete, all non-system tables will have their
replication details automatically loaded when the table is
opened.
When a record change or delete is made, the 'Bbuuzzb' servers listed
in the replication link list will each receive the corresponding change
command.
Replication details are stored in field three of the
catalog. This field contains a 'sub-field' array
with each 'sub-field' containing two 'sub-sub-fields'
Here is the layout:
Sub-Sub-Field Description
1 TCP port of target Bbuuzzb server
2 logical table name on target server
Notice that you must always supply the logical table name which
means that the target table must also be cataloged and the
catalog flag on all listed servers (and this one) must also be
high. The replication flag of this server and every server
listed in the replication details must be high also.
Original Linux version. Jan/2002, Rick Smereka
Ported to Debian Linux. Nov/2002, Rick Smereka
Changed call to 'connect' to use 'SA' definition (defined in
'flsocket.h'). Jun/2003, Rick Smereka
Added a parameter to the function 'dbengrep_rll_load.
Modified function 'dbengrep_init' to call 'dbeng_catalog'
functions to acquire replication list entries.
Changed all logging calls from 'sys_log' to 'logman'.
Mar/2004, Rick Smereka */
#include "stdhead.h"
#include "flsocket.h"
#include "dbmess.h"
#include "dbeng.h"
#include "dbengcfg.h"
#include "dbengrep.h"
#include "dbengcat.h"
#include "socloc.h"
#include "sloc.h"
/* module globals */
/* pointer to top of available Bbuuzzb server link list */
struct dbengrep_servers *dbengrep_shead = DBENGREP_SOT_NULL;
/* pointer to destination table */
struct dbeng_table *rep_table;
/* this server port number (declared in dbsrv.c) */
extern int server_port;
/* pointer to top of 'dbeng_table' list (declared in dbeng.c) */
extern struct dbeng_table *dbeng_head;
/* socket globals */
#ifdef OS_WIN32
SOCKET dbengrep_clientSocket;
SOCKADDR_IN dbengrep_sockClientAddr;
LPHOSTENT dbengrep_lpHostEnt;
#endif
#ifdef OS_UNIX
int dbengrep_clientSocket;
struct sockaddr_in dbengrep_sockClientAddr;
struct hostent *dbengrep_lpHostEnt;
#endif
char dbengrep_hostname[128]; /* host name of replication target */
int dbengrep_port; /* port of replication target */
char dbengrep_from_tname[128]; /* table name of replication source */
int dbengrep_from_port; /* port of replication source */
/* private functions */
static int dbengrep_recv_write(char *);
static int dbengrep_recv_delete(char *);
static int dbengrep_recv_pack(void);
static long dbengrep_recv_get_fpos(char *);
static int dbengrep_rll_add(struct dbeng_table *, char *, int);
static struct dbeng_rep *dbengrep_rll_find(struct dbeng_table *, int);
static int dbengrep_rll_delete(struct dbeng_table *, int);
static int dbengrep_sll_load(void);
static int dbengrep_sll_add(char *, int);
static int dbengrep_sll_delete(int);
static struct dbengrep_servers *dbengrep_sll_find(int);
static void dbengrep_sll_delete_all(void);
static int dbengrep_set_active(char *, int);
static int dbengrep_send_recv(char *);
static int dbengrep_connect(void);
int dbengrep_init(void)
{
/* Initialize replication or update replication details.
Function returns a 'dbeng' code. */
struct dbeng_table *rov = dbeng_head;
struct dbengcat_details *det;
char mname[] = "dbengrep_init";
int ret, rep_flag;
logman("%s:enter", mname);
/* replication must be active */
(void)dbeng_config_get_replicate_flag(&rep_flag);
if (!rep_flag)
{
dbeng_io_code_log(mname, "replication not active",
DBENG_REPLICATION_NOT_ACTIVE);
return(DBENG_REPLICATION_NOT_ACTIVE);
}
/* load or re-load list of other Bbuuzzb servers (excluding this one) */
if ((ret = dbengrep_sll_load()) != DBENG_OK)
{
dbeng_io_code_log(mname, "bad rc from dbengrep_sll_load", ret);
return(ret);
}
/* if no tables in use, we are done */
if (rov == DBENG_OT_NULL)
{
dbeng_io_code_log(mname, "no tables in use", DBENG_OK);
return(DBENG_OK);
}
/* go through list of tables in use and load replication details
for each table (if present) */
while(rov != DBENG_OT_NULL)
{
// load catalog details for this table
if ((ret = dbeng_catalog_init_table_details(rov->name, FALSE, &det)) !=
DBENG_OK)
{
dbeng_io_code_log(mname,
"bad rc from dbeng_catalog_init_table_details", ret);
return(ret);
}
if ((ret = dbeng_catalog_table_details(det)) != DBENG_OK)
{
dbeng_io_code_log(mname, "bad rc from dbeng_catalog_table_details",
ret);
return(ret);
}
if ((ret = dbengrep_rll_load(rov, det->rep_list)) != DBENG_OK)
if (ret != DBENG_NO_REPLICATION_ENTRY)
{
logman("%s:exit:bad rc[%d] from dbengrep_rll_load[%d]",
mname, ret, rov->tid);
return(ret);
}
if ((ret = dbeng_catalog_term_table_details(&det)) != DBENG_OK)
{
dbeng_io_code_log(mname, "bad rc from "
"dbeng_catalog_term_table_details", ret);
return(ret);
}
rov = rov->next;
}
dbengrep_hostname[0] = dbengrep_from_tname[0] = EOS;
dbengrep_port = dbengrep_from_port = 0;
dbeng_io_code_log(mname, "normal exit", DBENG_OK);
return(DBENG_OK);
}
int dbengrep_end(void)
{
/* Clear all memory allocated for replication. Function returns
a 'dbeng' code. */
struct dbeng_table *rov = dbeng_head;
char mname[] = "dbengrep_end";
int ret;
logman("%s:enter", mname);
dbengrep_sll_delete_all();
while(rov != DBENG_OT_NULL)
{
if ((ret = dbengrep_rll_delete_all(rov)) != DBENG_OK)
logman("%s:exit:bad rc[%d] from dbengrep_rll_delete_all,"
"tid=%d", mname, ret, rov->tid);
rov = rov->next;
}
dbengrep_hostname[0] = dbengrep_from_tname[0] = EOS;
dbengrep_port = dbengrep_from_port = 0;
dbeng_io_code_log(mname, "normal exit", DBENG_OK);
return(DBENG_OK);
}
int dbengrep_recv(char *com)
{
/* Process a received replication command. Received replication requests
will only be processed if the replication flag is high. Function
returns a 'dbeng' code. */
char mname[] = "dbengrep_recv", *tname, tmp[10];
int rcode, ret, len, nwords, tid, session_flag, rep_flag;
logman("%s:enter", mname);
(void)dbeng_config_get_replicate_flag(&rep_flag);
if (!rep_flag)
{
dbeng_io_code_log(mname, "replicate flag down",
DBENG_REPLICATION_NOT_ACTIVE);
return(DBENG_REPLICATION_NOT_ACTIVE);
}
if (com == (char *)NULL || !strlen(com))
{
dbeng_io_code_log(mname, "nul or empty[com]", DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
nwords = command_words(com);
if (nwords < 4)
{
dbeng_io_code_log(mname, "improper number of parameters",
DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
/* get replication code (command word 1) */
if (!command_word(com, tmp, 1))
{
dbeng_io_code_log(mname, "bad rc from command_word[tmp(rcode)]",
DBENG_INTERNAL_ERROR);
return(DBENG_INTERNAL_ERROR);
}
if (!qatoi(tmp, &rcode))
{
dbeng_io_code_log(mname, "non numeric[rcode]", DBENG_INTERNAL_ERROR);
return(DBENG_INTERNAL_ERROR);
}
/* get from port (command word 2) */
if (!command_word(com, tmp, 2))
{
dbeng_io_code_log(mname, "bad rc from command_word[tmp"
"(dbengrep_from_port)]", DBENG_INTERNAL_ERROR);
return(DBENG_INTERNAL_ERROR);
}
if (!qatoi(tmp, &dbengrep_from_port))
{
dbeng_io_code_log(mname, "non numeric[dbengrep_from_port]",
DBENG_INTERNAL_ERROR);
return(DBENG_INTERNAL_ERROR);
}
/* get from table name (command word 3) */
if (!command_word(com, dbengrep_from_tname, 3))
{
dbeng_io_code_log(mname, "bad rc from command_word[dbengrep_from_tname]",
DBENG_INTERNAL_ERROR);
return(DBENG_INTERNAL_ERROR);
}
/* get destination table name (command word 4) */
len = command_wordlen(com, 4);
if (len <= 0)
{
dbeng_io_code_log(mname, "bad rc from command_wordlen[tname]",
DBENG_INTERNAL_ERROR);
return(DBENG_INTERNAL_ERROR);
}
if ((tname = (char *)malloc(len + 1)) == (char *)NULL)
{
dbeng_io_code_log(mname, "alloc fail[tname]", DBENG_MEMORY_FAIL);
return(DBENG_MEMORY_FAIL);
}
if (!command_word(com, tname, 4))
{
dbeng_io_code_log(mname, "bad rc from command_word[tname]",
DBENG_INTERNAL_ERROR);
free(tname);
return(DBENG_INTERNAL_ERROR);
}
/* open the destination table */
if ((ret = dbeng_open_table(tname, &tid)) != DBENG_OK)
{
dbeng_io_code_log(mname, "bad rc from dbeng_open_table", ret);
free(tname);
return(ret);
}
free(tname);
/* get table pointer from tid */
if ((rep_table = dbeng_atid_lookup(tid)) == DBENG_OT_NULL)
{
dbeng_io_code_log(mname, "bad rc[NULL] from dbeng_atid_lookup",
DBENG_INTERNAL_ERROR);
return(DBENG_INTERNAL_ERROR);
}
/* if session table is active, the physical file will have been
closed by 'dbeng_open_table', re-open it */
(void)dbeng_config_get_session_flag(&session_flag);
if (session_flag)
if ((ret = dbeng_ll_open(rep_table)) != DBENG_OK)
{
dbeng_io_code_log(mname, "bad rc from dbeng_ll_open", ret);
return(ret);
}
/* process command */
switch(rcode)
{
case DBENG_SEND_REPLICATE_WRITE:
ret = dbengrep_recv_write(com);
break;
case DBENG_SEND_REPLICATE_DELETE:
ret = dbengrep_recv_delete(com);
break;
case DBENG_SEND_REPLICATE_PACK:
ret = dbengrep_recv_pack();
break;
default:
logman("%s:exit[%d]:unknown replication command", mname,
DBENG_INTERNAL_ERROR);
(void)dbeng_close_table(rep_table);
return(DBENG_INTERNAL_ERROR);
};
if (ret != DBENG_OK)
(void)dbeng_close_table(rep_table);
else
ret = dbeng_close_table(rep_table);
dbeng_io_code_log(mname, "normal exit", ret);
return(ret);
}
int dbengrep_send(struct dbeng_table *ot, int rep_code, char *irec)
{
/* Send a replication command to every listed available
server. 'rep_code' is the replication code and 'irec'
is the input record contents which is only used in the
case of a 'write' replication command. Function returns
a 'dbeng' code. */
struct dbeng_rep *rov;
struct dbengrep_servers *bserver;
char mname[] = "dbengrep_send", *qftname, *qdtname, *qrec, *com;
int ret, len, rep_flag, entry_changed = FALSE;
/* do not send any replication commands if the flag is down */
(void)dbeng_config_get_replicate_flag(&rep_flag);
if (!rep_flag)
{
dbeng_io_code_log(mname, "replication not active",
DBENG_REPLICATION_NOT_ACTIVE);
return(DBENG_REPLICATION_NOT_ACTIVE);
}
if (ot == DBENG_OT_NULL)
{
dbeng_io_code_log(mname, "null[ot]", DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
rov = ot->rep_list;
/* if replication list is empty, return with ok */
if (rov == DBENG_ROT_NULL)
{
dbeng_io_code_log(mname, "exit:replication list empty", DBENG_OK);
return(DBENG_OK);
}
logman("%s:enter:tid=%d,rep=%d", mname, ot->tid, rep_code);
/* loop through replication list, sending a replication command
to each listed server */
while(rov != DBENG_ROT_NULL)
{
/* if we are attempting to send to the same server as the
last received replication request, refuse */
if (rov->port == dbengrep_from_port &&
!strcmp(rov->tname, dbengrep_from_tname))
{
logman("%s:send[%d,%s] denied:same table as last rep recv",
mname, rov->port, rov->tname);
rov->port = 0;
entry_changed = TRUE;
rov = rov->next;
continue;
}
/* get server host name from 'dbengrep_servers' link list */
if ((bserver = dbengrep_sll_find(rov->port)) == DBENGREP_SOT_NULL)
{
logman("%s:server on port %d is not in server list", mname,
rov->port);
/* if server is missing from servers list mark the entry */
rov->port = 0;
entry_changed = TRUE;
rov = rov->next;
continue;
}
/* load host name and port number of selected server */
if ((ret = dbengrep_set_active(bserver->host, rov->port)) != DBENG_OK)
{
logman("%s:skip:bad rc from dbengrep_set_active[%s,%d]",
mname, bserver->host, rov->port);
rov = rov->next;
continue;
}
logman("%s:rep[%d]->%s[%d]", mname, rep_code, dbengrep_hostname,
dbengrep_port);
/* calculate length of command string */
len = strlen(ot->name) + 3 + strlen(rov->tname) + 3 + 9;
switch(rep_code)
{
case DBENG_SEND_REPLICATE_WRITE:
if (irec == (char *)NULL || !strlen(irec))
{
dbeng_io_code_log(mname, "null or empty[irec]",
DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
len += strlen(irec) + 3;
break;
case DBENG_SEND_REPLICATE_DELETE:
len += 11;
break;
case DBENG_SEND_REPLICATE_PACK:
break;
default:
logman("%s:exit:invalid rep_code[%d]", mname, rep_code);
return(DBENG_INVALID_FUNCTION);
};
if ((com = (char *)malloc(len)) == (char *)NULL)
{
logman("%s:exit:rep[%d]->%s[%d]:alloc fail[com]", mname,
rep_code, dbengrep_hostname, dbengrep_port);
return(DBENG_MEMORY_FAIL);
}
/* build command string parts */
if ((qftname = initqstring(ot->name)) == (char *)NULL)
{
logman("%s:exit:rep[%d]->%s[%d]:alloc fail[qftname]", mname,
rep_code, dbengrep_hostname, dbengrep_port);
free(com);
return(DBENG_MEMORY_FAIL);
}
if ((qdtname = initqstring(rov->tname)) == (char *)NULL)
{
logman("%s:exit:rep[%d]->%s[%d]:alloc fail[qdtname]", mname,
rep_code, dbengrep_hostname, dbengrep_port);
free(com);
free(qftname);
return(DBENG_MEMORY_FAIL);
}
/* build command string */
switch(rep_code)
{
case DBENG_SEND_REPLICATE_WRITE:
if ((qrec = initqstring(irec)) == (char *)NULL)
{
logman("%s:exit:rep[%d]->%s[%d]:alloc fail[irec]",
mname, rep_code, dbengrep_hostname, dbengrep_port);
free(com);
free(qftname);
free(qdtname);
return(DBENG_MEMORY_FAIL);
}
sprintf(com, "%d %d %s %s %s", rep_code, server_port,
qftname, qdtname, qrec);
free(qrec);
break;
case DBENG_SEND_REPLICATE_DELETE:
sprintf(com, "%d %d %s %s %ld", rep_code, server_port,
qftname, qdtname, ot->orig_position);
break;
case DBENG_SEND_REPLICATE_PACK:
sprintf(com, "%d %d %s %s", rep_code, server_port,
qftname, qdtname);
break;
};
free(qftname);
free(qdtname);
/* send replication command and wait for reply, do not exit
this function if command fails */
if ((ret = dbengrep_send_recv(com)) != DBENG_OK)
logman("%s:replication command failed,rc=%d", mname, ret);
free(com);
rov = rov->next;
}
/* if an error was found in the replication list, re-load */
if (entry_changed)
{
logman("%s:rep entry invalid:re-loading rep list", mname);
dbengrep_init();
}
dbeng_io_code_log(mname, "normal exit", DBENG_OK);
return(DBENG_OK);
}
int dbengrep_rll_delete_all(struct dbeng_table *ot)
{
/* Delete any replication link list nodes for a table. Function
returns a 'dbeng' code. */
struct dbeng_rep *rov, *tmp;
char mname[] = "dbengrep_rll_delete_all";
if (ot == DBENG_OT_NULL)
{
dbeng_io_code_log(mname, "null[ot]", DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
logman("%s:enter:tid=%d", mname, ot->tid);
rov = ot->rep_list;
while(rov != DBENG_ROT_NULL)
{
if (rov->tname != (char *)NULL)
free(rov->tname);
tmp = rov->next;
free(rov);
rov = tmp;
}
ot->rep_list = DBENG_ROT_NULL;
dbeng_io_code_log(mname, "normal exit", DBENG_OK);
return(DBENG_OK);
}
int dbengrep_rll_load(struct dbeng_table *ot, char *thelist)
{
/* Load or re-load the replication details into the
replication link list. Function returns a 'dbeng' code. */
struct dbengrep_servers *bserver;
char mname[] = "dbengrep_rll_load", *entry, *tname, port_char[10];
int ret, nentries, i, port, len;
if (ot == DBENG_OT_NULL)
{
dbeng_io_code_log(mname, "null[ot]", DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
if (thelist == (char *)NULL)
{
dbeng_io_code_log(mname, "null[thelist]", DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
logman("%s:enter:tid=%d", mname, ot->tid);
/* clear any list members if present */
if ((ret = dbengrep_rll_delete_all(ot)) != DBENG_OK)
{
dbeng_io_code_log(mname, "bad rc from dbengrep_rll_delete_all", ret);
return(ret);
}
len = strlen(thelist);
if (!len)
{
dbeng_io_code_log(mname, "no rep entries", DBENG_NO_REPLICATION_ENTRY);
return(DBENG_NO_REPLICATION_ENTRY);
}
nentries = ll_words(thelist, DBENG_LIST_DELIM);
logman("%s:got %d replication entries", mname, nentries);
if (nentries <= 0)
{
dbeng_io_code_log(mname, "bad nentires", DBENG_INTERNAL_ERROR);
return(DBENG_INTERNAL_ERROR);
}
if ((entry = (char *)malloc(len + 1)) == (char *)NULL)
{
dbeng_io_code_log(mname, "alloc fail[entry]", DBENG_MEMORY_FAIL);
return(DBENG_MEMORY_FAIL);
}
if ((tname = (char *)malloc(len + 1)) == (char *)NULL)
{
dbeng_io_code_log(mname, "alloc fail[tname]", DBENG_MEMORY_FAIL);
free(entry);
return(DBENG_MEMORY_FAIL);
}
/* loop through list matching the port number given in the catalog
to the list of available Bbuuzzb servers, if there is a match,
add the entry to the replication link list */
for(i = 1; i <= nentries; i++)
{
if (!ll_word(thelist, entry, i, DBENG_LIST_DELIM))
{
logman("%s:exit:bad rc[FALSE] from ll_word[%d]", mname, i);
free(entry);
free(tname);
return(DBENG_INTERNAL_ERROR);
}
if (!ll_word(entry, port_char, 1, ','))
{
logman("%s:exit:bad rc[FALSE] from ll_word[%d,port_char]",
mname, i);
free(entry);
free(tname);
return(DBENG_INTERNAL_ERROR);
}
if (!qatoi(port_char, &port))
{
logman("%s:exit:bad rc[FALSE] from qatoi[%d,port]", mname, i);
free(entry);
free(tname);
return(DBENG_INTERNAL_ERROR);
}
if ((bserver = dbengrep_sll_find(port)) != DBENGREP_SOT_NULL)
{
if (!ll_word(entry, tname, 2, ','))
{
logman("%s:exit:bad rc[FALSE] from ll_word[%d,tname]",
mname, i);
free(entry);
free(tname);
return(DBENG_INTERNAL_ERROR);
}
logman("%s:found match:p=%d,t=%s", mname, port, tname);
if ((ret = dbengrep_rll_add(ot, tname, port)) != DBENG_OK)
{
logman("%s:exit:bad rc[%d] from dbengrep_rll_add[%d]",
mname, ret, i);
free(entry);
free(tname);
return(ret);
}
}
}
free(entry);
free(tname);
dbeng_io_code_log(mname, "normal exit", DBENG_OK);
return(DBENG_OK);
}
/* private functions */
/* replication receive and process command functions */
static int dbengrep_recv_write(char *com)
{
char mname[] = "dbengrep_recv_write", *irec;
int ret, nwords;
logman("%s:enter", mname);
if (com == (char *)NULL || !strlen(com))
{
dbeng_io_code_log(mname, "null[com]", DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
nwords = command_words(com);
if (nwords < 5)
{
dbeng_io_code_log(mname, "exit:improper number of command words",
DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
/* get record contents (command word 5) */
if ((irec = (char *)malloc(strlen(com) + 1)) == (char *)NULL)
{
dbeng_io_code_log(mname, "alloc fail[irec]", DBENG_MEMORY_FAIL);
return(DBENG_MEMORY_FAIL);
}
if (!command_word(com, irec, 5))
{
dbeng_io_code_log(mname, "exit:bad rc[FALSE] from command_word[irec]",
DBENG_INTERNAL_ERROR);
free(irec);
return(DBENG_INTERNAL_ERROR);
}
/* write a new record */
if ((ret = dbeng_write_string_recd(rep_table, irec)) != DBENG_OK)
{
dbeng_io_code_log(mname, "exit:bad rc from dbeng_write_string_recd", ret);
free(irec);
return(ret);
}
free(irec);
dbeng_io_code_log(mname, "normal exit", ret);
return(ret);
}
static int dbengrep_recv_delete(char *com)
{
char mname[] = "dbengrep_recv_delete";
long fpos;
int ret, nwords;
logman("%s:enter", mname);
if (com == (char *)NULL || !strlen(com))
{
dbeng_io_code_log(mname, "null[com]", DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
nwords = command_words(com);
if (nwords < 5)
{
dbeng_io_code_log(mname, "exit:improper number of command words",
DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
/* get file position (command word 5) */
if ((fpos = dbengrep_recv_get_fpos(com)) == -1L)
{
dbeng_io_code_log(mname, "exit:bad rc[-1] from dbengep_recv_get_fpos",
DBENG_INTERNAL_ERROR);
return(DBENG_INTERNAL_ERROR);
}
/* set file position */
if ((ret = dbeng_set_pos(rep_table, fpos)) != DBENG_OK)
{
dbeng_io_code_log(mname, "exit:bad rc from dbeng_set_pos", ret);
return(ret);
}
/* get the record */
if ((ret = dbeng_get_recd(rep_table)) != DBENG_OK)
{
dbeng_io_code_log(mname, "exit:bad rc from dbeng_get_recd", ret);
return(ret);
}
/* delete the current record */
if ((ret = dbeng_delete_recd(rep_table)) != DBENG_OK)
{
dbeng_io_code_log(mname, "exit:bad rc from dbeng_delete_recd", ret);
return(ret);
}
dbeng_io_code_log(mname, "normal exit", ret);
return(ret);
}
static int dbengrep_recv_pack(void)
{
char mname[] = "dbengrep_recv_pack";
int ret;
logman("%s:enter", mname);
if ((ret = dbeng_pack_table(rep_table)) != DBENG_OK)
{
dbeng_io_code_log(mname, "exit:bad rc from dbeng_pack_table", ret);
return(ret);
}
dbeng_io_code_log(mname, "normal exit", ret);
return(ret);
}
static long dbengrep_recv_get_fpos(char *com)
{
/* Obtain the file position from the received replication
request which is always in command word six. Function
returns the file position upon success, -1 otherwise. */
char mname[] = "dbengrep_recv_get_fpos", fpos_char[25];
long fpos;
int nwords;
if (com == (char *)NULL || !strlen(com))
return(-1L);
nwords = command_words(com);
if (nwords < 5)
return(-1L);
if (!command_word(com, fpos_char, 5))
return(-1L);
if (!qatol(fpos_char, &fpos))
return(-1L);
return(fpos);
}
/* replication link list functions */
static int dbengrep_rll_add(struct dbeng_table *ot, char *tname, int port_num)
{
/* Add a replication entry. Function returns
DBENG'_OK' upon success, a 'dbeng' code
otherwise. */
struct dbeng_rep *dbengrep_entry;
struct dbeng_rep *rov;
char mname[] = "dbengrep_rll_add";
int ret, len, size;
size = sizeof(struct dbeng_rep);
/* reasonableness checks */
if (ot == DBENG_OT_NULL)
{
dbeng_io_code_log(mname, "null[ot]", DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
if (tname == (char *)NULL || !strlen(tname))
{
dbeng_io_code_log(mname, "null or empty[tname]", DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
if (port_num <= 0)
{
dbeng_io_code_log(mname, "out of range[port_num]",
DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
len = strlen(tname);
logman("%s:enter:tid=%d,tname=%s,l=%d,port_num=%d", mname, ot->tid,
tname, len, port_num);
if ((dbengrep_entry = (struct dbeng_rep *)malloc(size))
== DBENG_ROT_NULL)
{
dbeng_io_code_log(mname, "alloc fail[dbengrep_entry]",
DBENG_MEMORY_FAIL);
return(DBENG_MEMORY_FAIL);
}
if ((dbengrep_entry->tname = (char *)malloc(len + 1)) == (char *)NULL)
{
dbeng_io_code_log(mname, "alloc fail[dbengrep_entry->tname]",
DBENG_MEMORY_FAIL);
free(dbengrep_entry);
return(DBENG_MEMORY_FAIL);
}
strcpy(dbengrep_entry->tname, tname);
dbengrep_entry->port = port_num;
dbengrep_entry->next = DBENG_ROT_NULL;
/* set head of list if this is the first entry */
if (ot->rep_list == DBENG_ROT_NULL)
{
ot->rep_list = dbengrep_entry;
dbeng_io_code_log(mname, "normal exit at head of list", DBENG_OK);
return(DBENG_OK);
}
/* find last entry and hook into it */
rov = ot->rep_list;
while(rov->next != DBENG_ROT_NULL)
rov = rov->next;
rov->next = dbengrep_entry;
dbeng_io_code_log(mname, "normal exit", DBENG_OK);
return(DBENG_OK);
}
static struct dbeng_rep *dbengrep_rll_find(struct dbeng_table *ot,
int port_num)
{
/* Locate a replication node with the port 'port_num' within
the table 'ot'. Function returns a pointer to the replication
node upon success, a null pointer otherwise. */
struct dbeng_rep *rov;
char mname[] = "dbengrep_rll_find";
if (ot == DBENG_OT_NULL)
{
logman("%s:exit:null[ot]", mname);
return(DBENG_ROT_NULL);
}
logman("%s:enter:tid=%d,port=%d", mname, ot->tid, port_num);
rov = ot->rep_list;
while(rov != DBENG_ROT_NULL)
{
if (rov->port == port_num)
{
logman("%s:exit:found match", mname);
return(rov);
}
rov = rov->next;
}
logman("%s:exit:no match", mname);
return(DBENG_ROT_NULL);
}
static int dbengrep_rll_delete(struct dbeng_table *ot, int port_num)
{
/* Delete a replication entry.
Function returns 'DBENG_OK' upon success, a
'dbeng' code otherwise. */
struct dbeng_rep *rov;
struct dbeng_rep *prev;
char mname[] = "dbengrep_rll_delete";
if (ot == DBENG_OT_NULL)
{
dbeng_io_code_log(mname, "null[ot]", DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
logman("%s:enter:tid=%d,port=%d", mname, ot->tid, port_num);
rov = prev = ot->rep_list;
while(rov != DBENG_ROT_NULL)
{
if (port_num == rov->port)
break;
else
{
prev = rov;
rov = rov->next;
}
}
/* if no match, error */
if (rov == DBENG_ROT_NULL)
{
dbeng_io_code_log(mname, "entry not found", DBENG_NOT_EXIST);
return(DBENG_NOT_EXIST);
}
/* if we are deleting the head entry, set new head */
if (rov == ot->rep_list)
ot->rep_list = rov->next;
else
prev->next = rov->next;
if (rov->tname != (char *)NULL)
free(rov->tname);
free(rov);
dbeng_io_code_log(mname, "normal exit", DBENG_OK);
return(DBENG_OK);
}
/* available Bbuuzzb server link list functions */
static int dbengrep_sll_load(void)
{
/* Load the list of available 'Bbuuzzb' servers from 'socloc'
into the link list. This server denoted by
the port 'server_port' will be excluded from the list. */
char *thelist, mname[] = "dbengrep_sll_load", mes[128];
char *entry, *host, port_char[25];
int ret, nwords, i, len, port;
logman("%s:enter", mname);
/* make sure 'socloc' has already been initialized */
if ((ret = sloc_is_init()) != SL_OK)
{
sl_code_string(ret, mes);
logman("%s:bad rc[%d,%s][sloc_is_init]", mname,
ret, mes);
return(DBENG_SOCLOC_NO_INIT);
}
/* if the link list is not empty, clear it */
if (dbengrep_shead != DBENGREP_SOT_NULL)
dbengrep_sll_delete_all();
if ((thelist = (char *)malloc(SL_MAXCOMMAND)) == (char *)NULL)
{
dbeng_io_code_log(mname, "alloc fail[thelist]", DBENG_MEMORY_FAIL);
return(DBENG_MEMORY_FAIL);
}
/* get list of all 'Bbuuzzb' servers from 'socloc' */
if ((ret = sloc_find_list(DBENG_SERVICE_NAME, thelist)) != SL_OK)
{
free(thelist);
sl_code_string(ret, mes);
logman("%s:bad rc[%s] from sloc_find_list", mname, mes);
return(DBENG_NO_SERVER);
}
len = strlen(thelist);
nwords = ll_words(thelist, SL_LIST_DELIM);
if (nwords < 1)
{
free(thelist);
dbeng_io_code_log(mname, "socloc server list empty", DBENG_NO_SERVER);
return(DBENG_NO_SERVER);
}
if ((entry = (char *)malloc(len + 1)) == (char *)NULL)
{
free(thelist);
dbeng_io_code_log(mname, "alloc fail[entry]", DBENG_MEMORY_FAIL);
return(DBENG_MEMORY_FAIL);
}
/* go through list and add each server to link list (except this server) */
for(i = 1; i <= nwords; i++)
{
if (!ll_word(thelist, entry, i, SL_LIST_DELIM))
{
free(thelist);
free(entry);
logman("%s:bad rc[FALSE] from ll_word[entry=%d]", mname, i);
return(DBENG_INTERNAL_ERROR);
}
if (!command_word(entry, port_char, 3))
{
free(thelist);
free(entry);
logman("%s:bad rc[FALSE] from command_word[port_char,entry=%d]",
mname, i);
return(DBENG_INTERNAL_ERROR);
}
if (!qatoi(port_char, &port))
{
free(thelist);
free(entry);
logman("%s:bad rc[FALSE] from qatoi[port_char,entry=%d]",
mname, i);
return(DBENG_INTERNAL_ERROR);
}
/* do not add this server to the link list */
if (port != server_port)
{
len = strlen(entry);
if ((host = (char *)malloc(len + 1)) == (char *)NULL)
{
free(thelist);
free(entry);
dbeng_io_code_log(mname, "alloc fail[host]", DBENG_MEMORY_FAIL);
return(DBENG_MEMORY_FAIL);
}
if (!command_word(entry, host, 2))
{
free(thelist);
free(entry);
free(host);
logman("%s:bad rc[FALSE] from command_word[host,entry=%d",
mname, i);
return(DBENG_INTERNAL_ERROR);
}
if ((ret = dbengrep_sll_add(host, port)) != DBENG_OK)
{
free(thelist);
free(entry);
free(host);
dbeng_io_code_log(mname, "bad rc from dbengrep_ll_add", ret);
return(ret);
}
free(host);
}
}
free(entry);
free(thelist);
if (dbengrep_shead == DBENGREP_SOT_NULL)
{
dbeng_io_code_log(mname, "exit:no other Bbuuzzb servers",
DBENG_NO_SERVER);
return(DBENG_NO_SERVER);
}
dbeng_io_code_log(mname, "normal exit", DBENG_OK);
return(DBENG_OK);
}
static int dbengrep_sll_add(char *hname, int port_num)
{
/* Add a Bbuuzzb server entry. Function returns
DBENG'_OK' upon success, a 'dbeng' code
otherwise. */
struct dbengrep_servers *dbengrep_entry;
struct dbengrep_servers *rov;
char mname[] = "dbengrep_sll_add";
int ret, len, size;
size = sizeof(struct dbengrep_servers);
/* reasonableness checks */
if (hname == (char *)NULL || !strlen(hname))
{
dbeng_io_code_log(mname, "invalid parameter[hname]",
DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
if (port_num <= 0)
{
dbeng_io_code_log(mname, "port out of range", DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
logman("%s:enter:hname=%s,port_num=%d", mname, hname, port_num);
if ((dbengrep_entry = (struct dbengrep_servers *)malloc(size))
== DBENGREP_SOT_NULL)
{
dbeng_io_code_log(mname, "alloc fail[dbengrep_entry]",
DBENG_MEMORY_FAIL);
return(DBENG_MEMORY_FAIL);
}
len = strlen(hname);
if ((dbengrep_entry->host = (char *)malloc(len + 1)) == (char *)NULL)
{
dbeng_io_code_log(mname, "alloc fail[dbengrep_entry->host]",
DBENG_MEMORY_FAIL);
free(dbengrep_entry);
return(DBENG_MEMORY_FAIL);
}
strcpy(dbengrep_entry->host, hname);
dbengrep_entry->port = port_num;
dbengrep_entry->next = DBENGREP_SOT_NULL;
/* set head of list if this is the first entry */
if (dbengrep_shead == DBENGREP_SOT_NULL)
{
dbengrep_shead = dbengrep_entry;
dbeng_io_code_log(mname, "normal exit at head of list", DBENG_OK);
return(DBENG_OK);
}
/* find last entry and hook into it */
rov = dbengrep_shead;
while(rov->next != DBENGREP_SOT_NULL)
rov = rov->next;
rov->next = dbengrep_entry;
dbeng_io_code_log(mname, "normal exit", DBENG_OK);
return(DBENG_OK);
}
static int dbengrep_sll_delete(int port_num)
{
/* Delete a Bbuuzzb server entry. Since the TCP/IP
port is unique, this is used as the entry key.
Function returns 'DBENG_OK' upon success, a
'dbeng' code otherwise. */
struct dbengrep_servers *rov;
struct dbengrep_servers *prev;
char mname[] = "dbengrep_sll_delete";
logman("%s:enter,port=%d", mname, port_num);
rov = prev = dbengrep_shead;
while(rov != DBENGREP_SOT_NULL)
{
if (port_num == rov->port)
break;
else
{
prev = rov;
rov = rov->next;
}
}
/* if no match, error */
if (rov == DBENGREP_SOT_NULL)
{
dbeng_io_code_log(mname, "entry not found", DBENG_NOT_EXIST);
return(DBENG_NOT_EXIST);
}
/* if we are deleting the head entry, set new head */
if (rov == dbengrep_shead)
dbengrep_shead = rov->next;
else
prev->next = rov->next;
free(rov->host);
free(rov);
dbeng_io_code_log(mname, "normal exit", DBENG_OK);
return(DBENG_OK);
}
static struct dbengrep_servers *dbengrep_sll_find(int port_num)
{
/* Locate an available Bbuuzzb server (not this one). Function
returns a pointer to the available server upon success,
'DBENGREP_SOT_NULL' otherwise. */
struct dbengrep_servers *rov = dbengrep_shead;
char mname[] = "dbengrep_sll_find";
logman("%s:enter:port_num=%d", mname, port_num);
if (port_num <= 0)
{
logman("%s:exit:out of range[port_num]", mname);
return(DBENGREP_SOT_NULL);
}
while(rov != DBENGREP_SOT_NULL)
{
if (rov->port == port_num)
{
logman("%s:normal exit:found match", mname);
return(rov);
}
rov = rov->next;
}
logman("%s:normal exit:no match found", mname);
return(DBENGREP_SOT_NULL);
}
static void dbengrep_sll_delete_all(void)
{
/* Deallocate all list members and set link list head
pointer to NULL. */
struct dbengrep_servers *rov = dbengrep_shead, *tmp;
char mname[] = "dbengrep_sll_delete_all";
logman("%s:enter", mname);
if (dbengrep_shead == DBENGREP_SOT_NULL)
{
logman("%s:exit:list empty", mname);
return;
}
while(rov != DBENGREP_SOT_NULL)
{
free(rov->host);
tmp = rov->next;
free(rov);
rov = tmp;
}
dbengrep_shead = DBENGREP_SOT_NULL;
logman("%s:normal exit", mname);
}
static int dbengrep_set_active(char *hname, int port)
{
/* Set the active destination Bbuuzzb server host name and
port number. Function returns a 'dbeng' code. */
char mname[] = "dbengrep_set_active";
logman("%s:enter", mname);
if (hname == (char *)NULL || !strlen(hname))
{
dbeng_io_code_log(mname, "null or empty[hname]", DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
if (port <= 0)
{
dbeng_io_code_log(mname, "out of range[port]", DBENG_INVALID_FUNCTION);
return(DBENG_INVALID_FUNCTION);
}
strcpy(dbengrep_hostname, hname);
dbengrep_port = port;
dbeng_io_code_log(mname, "normal exit", DBENG_OK);
return(DBENG_OK);
}
static int dbengrep_send_recv(char *com)
{
/* Send and receive a message to the Bbuuzzb server that returns no
other value except for the general io return code. Function
returns a DBENG io code. */
char *mess;
int i, len;
if (!dbengrep_port)
return(DBENG_NO_SERVER);
if (com == (char *)NULL || !strlen(com))
return(DBENG_INVALID_FUNCTION);
// connect to server
if (!dbengrep_connect())
return(DBENG_VC_ERROR);
len = strlen(com) + 5;
if ((mess = (char *)malloc(len)) == (char *)NULL)
{
#ifdef OS_WIN32
(void)closesocket(dbengrep_clientSocket);
#else
close(dbengrep_clientSocket);
#endif
return(DBENG_MEMORY_FAIL);
}
memset(mess, 0, len);
sprintf(mess, "%d %s", DBENG_SEND_REPLICATE, com);
// send message
if (!ipc_send(dbengrep_clientSocket, mess))
{
#ifdef OS_WIN32
(void)closesocket(dbengrep_clientSocket);
#else
close(dbengrep_clientSocket);
#endif
free(mess);
return(DBENG_VC_ERROR);
}
memset(mess, 0, len);
// receive return code
if (!ipc_recv(dbengrep_clientSocket, mess))
{
#ifdef OS_WIN32
(void)closesocket(dbengrep_clientSocket);
#else
close(dbengrep_clientSocket);
#endif
free(mess);
return(DBENG_VC_ERROR);
}
// make sure its a number
if (!qatoi(mess, &i))
{
#ifdef OS_WIN32
(void)closesocket(dbengrep_clientSocket);
#else
close(dbengrep_clientSocket);
#endif
free(mess);
return(DBENG_INTERNAL_ERROR);
}
#ifdef OS_WIN32
(void)closesocket(dbengrep_clientSocket);
#else
close(dbengrep_clientSocket);
#endif
free(mess);
return(i);
}
static int dbengrep_connect(void)
{
/* Connect to a Bbuuzzb server. An attempt to open the TCP
socket is made. The host name 'dbengrep_hostname' and the
TCP port 'dbengrep_port' are assumed to be already loaded.
Function returns 'TRUE' if a successful connection was made,
'FALSE' otherwise. */
// make sure host name and port are loaded
if (!dbengrep_port || !strlen(dbengrep_hostname))
return(FALSE);
// resolve server host name
dbengrep_lpHostEnt = gethostbyname(dbengrep_hostname);
if (!dbengrep_lpHostEnt)
return(FALSE);
// create the socket
#ifdef OS_WIN32
dbengrep_clientSocket = socket(PF_INET, SOCK_STREAM, DEFAULT_PROTOCOL);
#endif
#ifdef OS_UNIX
dbengrep_clientSocket = socket(AF_INET, SOCK_STREAM, DEFAULT_PROTOCOL);
#endif
if (dbengrep_clientSocket == INVALID_SOCKET)
return(FALSE);
// load client address data
memset(&dbengrep_sockClientAddr, 0, sizeof(dbengrep_sockClientAddr));
dbengrep_sockClientAddr.sin_family = AF_INET;
dbengrep_sockClientAddr.sin_port = htons(dbengrep_port);
#ifdef OS_WIN32
dbengrep_sockClientAddr.sin_addr =
*((LPIN_ADDR)*dbengrep_lpHostEnt->h_addr_list);
#endif
#ifdef OS_UNIX
dbengrep_sockClientAddr.sin_addr.s_addr =
((struct in_addr *)(dbengrep_lpHostEnt->h_addr))->s_addr;
#endif
// connect to server
#ifdef OS_WIN32
if (connect(dbengrep_clientSocket, (LPSOCKADDR)&dbengrep_sockClientAddr,
sizeof(dbengrep_sockClientAddr)))
#endif
#ifdef OS_UNIX
if (connect(dbengrep_clientSocket, (SA *)&dbengrep_sockClientAddr,
sizeof(dbengrep_sockClientAddr)) == SOCKET_ERROR)
#endif
return(FALSE);
return(TRUE);
}