Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 212 additions & 0 deletions deparse.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@
* query, which is useful at the time of execution to prepare MongoDB query.
*/
static void mongo_check_op_expr(OpExpr *node, MongoRelQualInfo *qual_info);
static void mongo_check_scalar_array_op_expr(ScalarArrayOpExpr *node,
MongoRelQualInfo *qual_info);
static void mongo_check_var(Var *column, MongoRelQualInfo *qual_info);

/* Helper functions to form MongoDB query document. */
static void mongo_append_bool_expr(BoolExpr *node, BSON *queryDoc,
pipeline_cxt *context);
static void mongo_append_op_expr(OpExpr *node, BSON *child,
pipeline_cxt *context);
static void mongo_append_scalar_array_op_expr(ScalarArrayOpExpr *node,
BSON *child_doc,
pipeline_cxt *context);
static void mongo_append_column_name(Var *column, BSON *queryDoc,
pipeline_cxt *context);
static void mongo_add_null_check(Var *column, BSON *expr,
Expand Down Expand Up @@ -151,6 +156,9 @@ mongo_check_qual(Expr *node, MongoRelQualInfo *qual_info)
}
}
break;
case T_ScalarArrayOpExpr:
mongo_check_scalar_array_op_expr((ScalarArrayOpExpr *) node, qual_info);
break;
case T_Const:
case T_Param:
/* Nothing to do here because we are looking only for Var's */
Expand Down Expand Up @@ -204,6 +212,48 @@ mongo_check_op_expr(OpExpr *node, MongoRelQualInfo *qual_info)
ReleaseSysCache(tuple);
}

/*
* mongo_check_scalar_array_op_expr
* Check given scalar array operator expression.
*/
static void
mongo_check_scalar_array_op_expr(ScalarArrayOpExpr *node, MongoRelQualInfo *qual_info)
{
HeapTuple tuple;
Form_pg_operator form;
char oprkind;
ListCell *arg;

/* Retrieve information about the operator from the system catalog. */
tuple = SearchSysCache1(OPEROID, ObjectIdGetDatum(node->opno));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for operator %u", node->opno);

form = (Form_pg_operator) GETSTRUCT(tuple);
oprkind = form->oprkind;

/* Sanity check. */
Assert((oprkind == 'r' && list_length(node->args) == 1) ||
(oprkind == 'l' && list_length(node->args) == 1) ||
(oprkind == 'b' && list_length(node->args) == 2));

/* Deparse left operand. */
if (oprkind == 'r' || oprkind == 'b')
{
arg = list_head(node->args);
mongo_check_qual(lfirst(arg), qual_info);
}

/* Deparse right operand. */
if (oprkind == 'l' || oprkind == 'b')
{
arg = list_tail(node->args);
mongo_check_qual(lfirst(arg), qual_info);
}

ReleaseSysCache(tuple);
}

/*
* mongo_check_var
* Check the given Var and append required information related to columns
Expand Down Expand Up @@ -349,6 +399,9 @@ mongo_append_expr(Expr *node, BSON *child_doc, pipeline_cxt *context)
case T_Aggref:
bsonAppendUTF8(child_doc, "0", "$v_having");
break;
case T_ScalarArrayOpExpr:
mongo_append_scalar_array_op_expr((ScalarArrayOpExpr *) node, child_doc, context);
break;
default:
elog(ERROR, "unsupported expression type to append: %d",
(int) nodeTag(node));
Expand Down Expand Up @@ -552,6 +605,165 @@ mongo_append_op_expr(OpExpr *node, BSON *child_doc, pipeline_cxt *context)
ReleaseSysCache(tuple);
}

/*
* mongo_append_scalar_array_op_expr
* Deparse given scalar array operator expression.
*
* Build and append following syntax into $and array:
*
* {"$in": [ "$age", ... ] }
*
* or for negated (<> ALL) version:
*
* {"$not": {"$in": [ "$age", ... ] }}
*
* Each element of operator (e.g. "$eq") array is appended by function called
* mongo_append_column_name.
*
* In MongoDB, (null = null), (null < 1) is TRUE but that is FALSE in Postgres.
* To eliminate null value rows, add equality check for null values for columns
* involved in JOIN and WHERE clauses. E.g. add the following syntax:
*
* {"$ne": [ "$age", null ]}
*/
static void
mongo_append_scalar_array_op_expr(ScalarArrayOpExpr *node, BSON *child_doc, pipeline_cxt *context)
{
HeapTuple tuple;
Form_pg_operator form;
char oprkind;
ListCell *arg;
BSON expr;
BSON child1;
BSON child_not;
char *mongo_operator;
int saved_array_index;
int reset_index = 0;
int and_index = 0;
BSON and_op;
BSON and_obj;

/* Increament operator expression count */
context->opExprCount++;

/* Retrieve information about the operator from the system catalog. */
tuple = SearchSysCache1(OPEROID, ObjectIdGetDatum(node->opno));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for operator %u", node->opno);

form = (Form_pg_operator) GETSTRUCT(tuple);
oprkind = form->oprkind;

/* Sanity check. */
Assert((oprkind == 'r' && list_length(node->args) == 1) ||
(oprkind == 'l' && list_length(node->args) == 1) ||
(oprkind == 'b' && list_length(node->args) == 2));

if (context->isBoolExpr == true)
{
bsonAppendStartObject(child_doc, psprintf("%d", and_index++),
&and_obj);
bsonAppendStartArray(&and_obj, "$and", &and_op);
bsonAppendStartObject(&and_op, psprintf("%d", context->arrayIndex),
&expr);
}
else
bsonAppendStartObject(child_doc, psprintf("%d", context->arrayIndex),
&expr);

/* Deparse operator name. */
mongo_operator = "$in";//mongo_operator_name(psprintf("%s %s",get_opname(node->opno), node->useOr ? "ANY" : "ALL"));
if (node->useOr == false) {
bsonAppendStartObject(&expr, "$not", &child_not);
bsonAppendStartArray(&child_not, mongo_operator, &child1);
} else {
bsonAppendStartArray(&expr, mongo_operator, &child1);
}


/* Save array index */
saved_array_index = context->arrayIndex;

/* Reset to zero to be used for nested arrays */
context->arrayIndex = reset_index;

/* Deparse left operand. */
if (oprkind == 'r' || oprkind == 'b')
{
arg = list_head(node->args);
mongo_append_expr(lfirst(arg), &child1, context);
}

/* Deparse right operand. */
if (oprkind == 'l' || oprkind == 'b')
{
if (oprkind == 'l')
context->arrayIndex = reset_index;
else
context->arrayIndex++;
arg = list_tail(node->args);
mongo_append_expr(lfirst(arg), &child1, context);
}

/* Decreament operator expression count */
context->opExprCount--;


if (node->useOr == false) {
bsonAppendFinishArray(&child_not, &child1);
bsonAppendFinishObject(&expr, &child_not);
} else {
bsonAppendFinishArray(&expr, &child1);
}
if (context->isBoolExpr)
bsonAppendFinishObject(&and_op, &expr);
else
bsonAppendFinishObject(child_doc, &expr);

/*
* Add equality check for null values for columns involved in JOIN and
* WHERE clauses.
*/
if (context->opExprCount == 0)
{
List *var_list;
ListCell *lc;

var_list = pull_var_clause((Node *) node, PVC_RECURSE_PLACEHOLDERS ||
PVC_RECURSE_AGGREGATES);

foreach(lc, var_list)
{
Var *var = (Var *) lfirst(lc);

if (context->isBoolExpr)
bsonAppendStartObject(&and_op, psprintf("%d", and_index++),
&expr);
else
bsonAppendStartObject(child_doc,
psprintf("%d", context->arrayIndex++),
&expr);
mongo_add_null_check(var, &expr, context);

if (context->isBoolExpr)
bsonAppendFinishObject(&and_op, &expr);
else
bsonAppendFinishObject(child_doc, &expr);
}
}

if (context->isBoolExpr == true)
{
bsonAppendFinishArray(&and_obj, &and_op);
bsonAppendFinishObject(child_doc, &and_obj);
}

/* Retain array index */
context->arrayIndex = saved_array_index;

ReleaseSysCache(tuple);
}

/*
* mongo_append_column_name
* Deparse Var and append corresponding column name to operator array.
Expand Down
12 changes: 6 additions & 6 deletions mongo_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,15 +425,15 @@ mongoGetForeignRelSize(PlannerInfo *root,

/*
* Identify which baserestrictinfo clauses can be sent to the remote
* server and which can't. Only the OpExpr clauses are sent to the remote
* server.
* server and which can't. Only the OpExpr and ScalarArrayOpExpr clauses
* are sent to the remote server.
*/
foreach(lc, baserel->baserestrictinfo)
{
RestrictInfo *ri = (RestrictInfo *) lfirst(lc);

#ifndef META_DRIVER
if (IsA(ri->clause, OpExpr) &&
if ((IsA(ri->clause, OpExpr) || (IsA(ri->clause, ScalarArrayOpExpr)) &&
mongo_is_foreign_expr(root, baserel, ri->clause, false))
#else
if (mongo_is_foreign_expr(root, baserel, ri->clause, false))
Expand Down Expand Up @@ -741,8 +741,8 @@ mongoGetForeignPlan(PlannerInfo *root,
* previously determined to be safe or unsafe are shown in
* fpinfo->remote_conds and fpinfo->local_conds. Anything else in the
* restrictionClauses list will be a join clause, which we have to check
* for remote-safety. Only the OpExpr clauses are sent to the remote
* server.
* for remote-safety. Only the OpExpr and ScalarArrayOpExpr clauses
* are sent to the remote server.
*/
foreach(lc, restrictionClauses)
{
Expand All @@ -758,7 +758,7 @@ mongoGetForeignPlan(PlannerInfo *root,
remote_exprs = lappend(remote_exprs, rinfo->clause);
else if (list_member_ptr(fpinfo->local_conds, rinfo))
local_exprs = lappend(local_exprs, rinfo->clause);
else if (IsA(rinfo->clause, OpExpr) &&
else if ((IsA(rinfo->clause, OpExpr) || IsA(rinfo->clause, ScalarArrayOpExpr)) &&
mongo_is_foreign_expr(root, foreignrel, rinfo->clause, false))
remote_exprs = lappend(remote_exprs, rinfo->clause);
else
Expand Down
1 change: 1 addition & 0 deletions mongo_fdw.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@

/* Defines for sending queries and converting types */
#define EQUALITY_OPERATOR_NAME "="
#define INEQUALITY_OPERATOR_NAME "<>"
#define INITIAL_ARRAY_CAPACITY 8
#define MONGO_TUPLE_COST_MULTIPLIER 5
#define MONGO_CONNECTION_COST_MULTIPLIER 5
Expand Down
Loading