diff --git a/deparse.c b/deparse.c index e7ad9a4..cfe8a68 100644 --- a/deparse.c +++ b/deparse.c @@ -45,6 +45,8 @@ * query, which is useful at the time of execution to prepare MongoDB query. */ static void mongo_check_op_expr(OpExpr *node, MongoRelQualInfo *qual_info); +static void mongo_check_scalar_array_op_expr(ScalarArrayOpExpr *node, + MongoRelQualInfo *qual_info); static void mongo_check_var(Var *column, MongoRelQualInfo *qual_info); /* Helper functions to form MongoDB query document. */ @@ -52,6 +54,9 @@ static void mongo_append_bool_expr(BoolExpr *node, BSON *queryDoc, pipeline_cxt *context); static void mongo_append_op_expr(OpExpr *node, BSON *child, pipeline_cxt *context); +static void mongo_append_scalar_array_op_expr(ScalarArrayOpExpr *node, + BSON *child_doc, + pipeline_cxt *context); static void mongo_append_column_name(Var *column, BSON *queryDoc, pipeline_cxt *context); static void mongo_add_null_check(Var *column, BSON *expr, @@ -151,6 +156,9 @@ mongo_check_qual(Expr *node, MongoRelQualInfo *qual_info) } } break; + case T_ScalarArrayOpExpr: + mongo_check_scalar_array_op_expr((ScalarArrayOpExpr *) node, qual_info); + break; case T_Const: case T_Param: /* Nothing to do here because we are looking only for Var's */ @@ -204,6 +212,48 @@ mongo_check_op_expr(OpExpr *node, MongoRelQualInfo *qual_info) ReleaseSysCache(tuple); } +/* + * mongo_check_scalar_array_op_expr + * Check given scalar array operator expression. + */ +static void +mongo_check_scalar_array_op_expr(ScalarArrayOpExpr *node, MongoRelQualInfo *qual_info) +{ + HeapTuple tuple; + Form_pg_operator form; + char oprkind; + ListCell *arg; + + /* Retrieve information about the operator from the system catalog. */ + tuple = SearchSysCache1(OPEROID, ObjectIdGetDatum(node->opno)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for operator %u", node->opno); + + form = (Form_pg_operator) GETSTRUCT(tuple); + oprkind = form->oprkind; + + /* Sanity check. */ + Assert((oprkind == 'r' && list_length(node->args) == 1) || + (oprkind == 'l' && list_length(node->args) == 1) || + (oprkind == 'b' && list_length(node->args) == 2)); + + /* Deparse left operand. */ + if (oprkind == 'r' || oprkind == 'b') + { + arg = list_head(node->args); + mongo_check_qual(lfirst(arg), qual_info); + } + + /* Deparse right operand. */ + if (oprkind == 'l' || oprkind == 'b') + { + arg = list_tail(node->args); + mongo_check_qual(lfirst(arg), qual_info); + } + + ReleaseSysCache(tuple); +} + /* * mongo_check_var * Check the given Var and append required information related to columns @@ -349,6 +399,9 @@ mongo_append_expr(Expr *node, BSON *child_doc, pipeline_cxt *context) case T_Aggref: bsonAppendUTF8(child_doc, "0", "$v_having"); break; + case T_ScalarArrayOpExpr: + mongo_append_scalar_array_op_expr((ScalarArrayOpExpr *) node, child_doc, context); + break; default: elog(ERROR, "unsupported expression type to append: %d", (int) nodeTag(node)); @@ -552,6 +605,165 @@ mongo_append_op_expr(OpExpr *node, BSON *child_doc, pipeline_cxt *context) ReleaseSysCache(tuple); } +/* + * mongo_append_scalar_array_op_expr + * Deparse given scalar array operator expression. + * + * Build and append following syntax into $and array: + * + * {"$in": [ "$age", ... ] } + * + * or for negated (<> ALL) version: + * + * {"$not": {"$in": [ "$age", ... ] }} + * + * Each element of operator (e.g. "$eq") array is appended by function called + * mongo_append_column_name. + * + * In MongoDB, (null = null), (null < 1) is TRUE but that is FALSE in Postgres. + * To eliminate null value rows, add equality check for null values for columns + * involved in JOIN and WHERE clauses. E.g. add the following syntax: + * + * {"$ne": [ "$age", null ]} + */ +static void +mongo_append_scalar_array_op_expr(ScalarArrayOpExpr *node, BSON *child_doc, pipeline_cxt *context) +{ + HeapTuple tuple; + Form_pg_operator form; + char oprkind; + ListCell *arg; + BSON expr; + BSON child1; + BSON child_not; + char *mongo_operator; + int saved_array_index; + int reset_index = 0; + int and_index = 0; + BSON and_op; + BSON and_obj; + + /* Increament operator expression count */ + context->opExprCount++; + + /* Retrieve information about the operator from the system catalog. */ + tuple = SearchSysCache1(OPEROID, ObjectIdGetDatum(node->opno)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for operator %u", node->opno); + + form = (Form_pg_operator) GETSTRUCT(tuple); + oprkind = form->oprkind; + + /* Sanity check. */ + Assert((oprkind == 'r' && list_length(node->args) == 1) || + (oprkind == 'l' && list_length(node->args) == 1) || + (oprkind == 'b' && list_length(node->args) == 2)); + + if (context->isBoolExpr == true) + { + bsonAppendStartObject(child_doc, psprintf("%d", and_index++), + &and_obj); + bsonAppendStartArray(&and_obj, "$and", &and_op); + bsonAppendStartObject(&and_op, psprintf("%d", context->arrayIndex), + &expr); + } + else + bsonAppendStartObject(child_doc, psprintf("%d", context->arrayIndex), + &expr); + + /* Deparse operator name. */ + mongo_operator = "$in";//mongo_operator_name(psprintf("%s %s",get_opname(node->opno), node->useOr ? "ANY" : "ALL")); + if (node->useOr == false) { + bsonAppendStartObject(&expr, "$not", &child_not); + bsonAppendStartArray(&child_not, mongo_operator, &child1); + } else { + bsonAppendStartArray(&expr, mongo_operator, &child1); + } + + + /* Save array index */ + saved_array_index = context->arrayIndex; + + /* Reset to zero to be used for nested arrays */ + context->arrayIndex = reset_index; + + /* Deparse left operand. */ + if (oprkind == 'r' || oprkind == 'b') + { + arg = list_head(node->args); + mongo_append_expr(lfirst(arg), &child1, context); + } + + /* Deparse right operand. */ + if (oprkind == 'l' || oprkind == 'b') + { + if (oprkind == 'l') + context->arrayIndex = reset_index; + else + context->arrayIndex++; + arg = list_tail(node->args); + mongo_append_expr(lfirst(arg), &child1, context); + } + + /* Decreament operator expression count */ + context->opExprCount--; + + + if (node->useOr == false) { + bsonAppendFinishArray(&child_not, &child1); + bsonAppendFinishObject(&expr, &child_not); + } else { + bsonAppendFinishArray(&expr, &child1); + } + if (context->isBoolExpr) + bsonAppendFinishObject(&and_op, &expr); + else + bsonAppendFinishObject(child_doc, &expr); + + /* + * Add equality check for null values for columns involved in JOIN and + * WHERE clauses. + */ + if (context->opExprCount == 0) + { + List *var_list; + ListCell *lc; + + var_list = pull_var_clause((Node *) node, PVC_RECURSE_PLACEHOLDERS || + PVC_RECURSE_AGGREGATES); + + foreach(lc, var_list) + { + Var *var = (Var *) lfirst(lc); + + if (context->isBoolExpr) + bsonAppendStartObject(&and_op, psprintf("%d", and_index++), + &expr); + else + bsonAppendStartObject(child_doc, + psprintf("%d", context->arrayIndex++), + &expr); + mongo_add_null_check(var, &expr, context); + + if (context->isBoolExpr) + bsonAppendFinishObject(&and_op, &expr); + else + bsonAppendFinishObject(child_doc, &expr); + } + } + + if (context->isBoolExpr == true) + { + bsonAppendFinishArray(&and_obj, &and_op); + bsonAppendFinishObject(child_doc, &and_obj); + } + + /* Retain array index */ + context->arrayIndex = saved_array_index; + + ReleaseSysCache(tuple); +} + /* * mongo_append_column_name * Deparse Var and append corresponding column name to operator array. diff --git a/mongo_fdw.c b/mongo_fdw.c index 185a959..2ddfd19 100644 --- a/mongo_fdw.c +++ b/mongo_fdw.c @@ -425,15 +425,15 @@ mongoGetForeignRelSize(PlannerInfo *root, /* * Identify which baserestrictinfo clauses can be sent to the remote - * server and which can't. Only the OpExpr clauses are sent to the remote - * server. + * server and which can't. Only the OpExpr and ScalarArrayOpExpr clauses + * are sent to the remote server. */ foreach(lc, baserel->baserestrictinfo) { RestrictInfo *ri = (RestrictInfo *) lfirst(lc); #ifndef META_DRIVER - if (IsA(ri->clause, OpExpr) && + if ((IsA(ri->clause, OpExpr) || (IsA(ri->clause, ScalarArrayOpExpr)) && mongo_is_foreign_expr(root, baserel, ri->clause, false)) #else if (mongo_is_foreign_expr(root, baserel, ri->clause, false)) @@ -741,8 +741,8 @@ mongoGetForeignPlan(PlannerInfo *root, * previously determined to be safe or unsafe are shown in * fpinfo->remote_conds and fpinfo->local_conds. Anything else in the * restrictionClauses list will be a join clause, which we have to check - * for remote-safety. Only the OpExpr clauses are sent to the remote - * server. + * for remote-safety. Only the OpExpr and ScalarArrayOpExpr clauses + * are sent to the remote server. */ foreach(lc, restrictionClauses) { @@ -758,7 +758,7 @@ mongoGetForeignPlan(PlannerInfo *root, remote_exprs = lappend(remote_exprs, rinfo->clause); else if (list_member_ptr(fpinfo->local_conds, rinfo)) local_exprs = lappend(local_exprs, rinfo->clause); - else if (IsA(rinfo->clause, OpExpr) && + else if ((IsA(rinfo->clause, OpExpr) || IsA(rinfo->clause, ScalarArrayOpExpr)) && mongo_is_foreign_expr(root, foreignrel, rinfo->clause, false)) remote_exprs = lappend(remote_exprs, rinfo->clause); else diff --git a/mongo_fdw.h b/mongo_fdw.h index 5bcb220..74f903f 100644 --- a/mongo_fdw.h +++ b/mongo_fdw.h @@ -168,6 +168,7 @@ /* Defines for sending queries and converting types */ #define EQUALITY_OPERATOR_NAME "=" +#define INEQUALITY_OPERATOR_NAME "<>" #define INITIAL_ARRAY_CAPACITY 8 #define MONGO_TUPLE_COST_MULTIPLIER 5 #define MONGO_CONNECTION_COST_MULTIPLIER 5 diff --git a/mongo_query.c b/mongo_query.c index c901b12..cca2dbf 100644 --- a/mongo_query.c +++ b/mongo_query.c @@ -324,6 +324,7 @@ mongo_query_document(ForeignScanState *scanStateNode) context.isBoolExpr = false; context.isJoinClause = false; context.scanStateNode = scanStateNode; + context.arrayIndex = 0; bsonAppendStartArray(filter, "pipeline", &match_stage); @@ -515,6 +516,7 @@ mongo_query_document(ForeignScanState *scanStateNode) context.isBoolExpr = false; context.isJoinClause = true; context.scanStateNode = scanStateNode; + context.arrayIndex = 0; /* Form equivalent join qual clauses in MongoDB */ mongo_prepare_pipeline(joinclauses, &inner_pipeline, &context); @@ -698,6 +700,7 @@ mongo_query_document(ForeignScanState *scanStateNode) context.isBoolExpr = false; context.isJoinClause = false; context.scanStateNode = scanStateNode; + context.arrayIndex = 0; /* $match stage. Add a filter for the HAVING clause */ bsonAppendStartObject(&root_pipeline, psprintf("%d", root_index++), @@ -1233,6 +1236,48 @@ append_mongo_value(BSON *queryDocument, const char *keyName, Datum value, pfree(elem_nulls); } break; + case NAMEARRAYOID: + { + ArrayType *array; + Oid elmtype; + int16 elmlen; + bool elmbyval; + char elmalign; + int num_elems; + Datum *elem_values; + bool *elem_nulls; + int i; + BSON childDocument; + + array = DatumGetArrayTypeP(value); + elmtype = ARR_ELEMTYPE(array); + get_typlenbyvalalign(elmtype, &elmlen, &elmbyval, &elmalign); + + deconstruct_array(array, elmtype, elmlen, elmbyval, elmalign, + &elem_values, &elem_nulls, &num_elems); + + bsonAppendStartArray(queryDocument, keyName, &childDocument); + for (i = 0; i < num_elems; i++) + { + char *outputString; + Oid outputFunctionId; + bool typeVarLength; + bson_oid_t bsonObjectId; + + if (elem_nulls[i]) + continue; + + getTypeOutputInfo(NAMEOID, &outputFunctionId, &typeVarLength); + outputString = OidOutputFunctionCall(outputFunctionId, elem_values[i]); + bsonOidFromString(&bsonObjectId, outputString); + status = bsonAppendOid(&childDocument, keyName, &bsonObjectId); + } + + bsonAppendFinishArray(queryDocument, &childDocument); + pfree(elem_values); + pfree(elem_nulls); + } + break; case JSONOID: { char *outputString; @@ -1459,14 +1504,6 @@ foreign_expr_walker(Node *node, foreign_glob_cxt *glob_cxt, { Const *c = (Const *) node; - /* - * We don't push down operators where the constant is an - * array, since conditional operators for arrays in MongoDB - * aren't properly defined. - */ - if (OidIsValid(get_element_type(c->consttype))) - return false; - /* * If the constant has nondefault collation, either it's of a * non-builtin type, or it reflects folding of a CollateExpr. @@ -1475,7 +1512,10 @@ foreign_expr_walker(Node *node, foreign_glob_cxt *glob_cxt, */ collation = c->constcollid; if (collation == InvalidOid || - collation == DEFAULT_COLLATION_OID) + collation == DEFAULT_COLLATION_OID || + collation == C_COLLATION_OID || + collation == POSIX_COLLATION_OID + ) state = FDW_COLLATE_NONE; else state = FDW_COLLATE_UNSAFE; @@ -1489,8 +1529,9 @@ foreign_expr_walker(Node *node, foreign_glob_cxt *glob_cxt, * Bail out on planner internal params. We could perhaps pass * them to the remote server as regular params, but we don't * have the machinery to do that at the moment. + * TODO: Check if the param type oid is allowable */ - if (p->paramkind != PARAM_EXTERN) + if (p->paramkind != PARAM_EXTERN && p->paramkind != PARAM_EXEC) return false; /* @@ -1498,7 +1539,10 @@ foreign_expr_walker(Node *node, foreign_glob_cxt *glob_cxt, */ collation = p->paramcollid; if (collation == InvalidOid || - collation == DEFAULT_COLLATION_OID) + collation == DEFAULT_COLLATION_OID || + collation == C_COLLATION_OID || + collation == POSIX_COLLATION_OID + ) state = FDW_COLLATE_NONE; else state = FDW_COLLATE_UNSAFE; @@ -1558,7 +1602,11 @@ foreign_expr_walker(Node *node, foreign_glob_cxt *glob_cxt, else if (inner_cxt.state == FDW_COLLATE_SAFE && collation == inner_cxt.collation) state = FDW_COLLATE_SAFE; - else if (collation == DEFAULT_COLLATION_OID) + else if ( + collation == DEFAULT_COLLATION_OID || + collation == C_COLLATION_OID || + collation == POSIX_COLLATION_OID + ) state = FDW_COLLATE_NONE; else state = FDW_COLLATE_UNSAFE; @@ -1585,7 +1633,11 @@ foreign_expr_walker(Node *node, foreign_glob_cxt *glob_cxt, else if (inner_cxt.state == FDW_COLLATE_SAFE && collation == inner_cxt.collation) state = FDW_COLLATE_SAFE; - else if (collation == DEFAULT_COLLATION_OID) + else if ( + collation == DEFAULT_COLLATION_OID || + collation == C_COLLATION_OID || + collation == POSIX_COLLATION_OID + ) state = FDW_COLLATE_NONE; else state = FDW_COLLATE_UNSAFE; @@ -1721,12 +1773,68 @@ foreign_expr_walker(Node *node, foreign_glob_cxt *glob_cxt, else if (inner_cxt.state == FDW_COLLATE_SAFE && collation == inner_cxt.collation) state = FDW_COLLATE_SAFE; - else if (collation == DEFAULT_COLLATION_OID) + else if ( + collation == DEFAULT_COLLATION_OID || + collation == C_COLLATION_OID || + collation == POSIX_COLLATION_OID + ) state = FDW_COLLATE_NONE; else state = FDW_COLLATE_UNSAFE; } break; + case T_ScalarArrayOpExpr: + { + ScalarArrayOpExpr *saoe = (ScalarArrayOpExpr*) node; + char *oname = get_opname(saoe->opno); + + /* Don't support operator expression in grouping targets */ + if (IS_UPPER_REL(glob_cxt->foreignrel) && + !glob_cxt->is_having_cond) + return false; + +#ifndef META_DRIVER + /* Increment the operator expression count */ + glob_cxt->opexprcount++; +#endif + + /* + * We only support = ANY and <> ALL (basically $in and $nin) + * operators for joinclause of join relation. + */ + if (!(strncmp(oname, EQUALITY_OPERATOR_NAME, NAMEDATALEN) == 0 && saoe->useOr) && + !(strncmp(oname, INEQUALITY_OPERATOR_NAME, NAMEDATALEN) == 0 && !saoe->useOr)) + return false; + + /* + * Recurse to input subexpressions. + * + * We support same operators as joinclause for WHERE + * conditions of simple as well as join relation. + */ + if (!foreign_expr_walker((Node *) saoe->args, glob_cxt, + &inner_cxt) +#ifndef META_DRIVER + || (glob_cxt->opexprcount > 1) +#endif + ) + return false; + + /* + * If operator's input collation is not derived from a foreign + * Var, it can't be sent to remote. + */ + if (saoe->inputcollid == InvalidOid) + /* OK, inputs are all noncollatable */ ; + else if (inner_cxt.state != FDW_COLLATE_SAFE || + saoe->inputcollid != inner_cxt.collation) + return false; + + /* Output is always boolean and so noncollatable. */ + collation = InvalidOid; + state = FDW_COLLATE_NONE; + } + break; #endif default: @@ -1760,12 +1868,20 @@ foreign_expr_walker(Node *node, foreign_glob_cxt *glob_cxt, /* * Non-default collation always beats default. */ - if (outer_cxt->collation == DEFAULT_COLLATION_OID) + if ( + outer_cxt->collation == DEFAULT_COLLATION_OID || + outer_cxt->collation == C_COLLATION_OID || + outer_cxt->collation == POSIX_COLLATION_OID + ) { /* Override previous parent state */ outer_cxt->collation = collation; } - else if (collation != DEFAULT_COLLATION_OID) + else if ( + collation != DEFAULT_COLLATION_OID && + collation != C_COLLATION_OID && + collation != POSIX_COLLATION_OID + ) { /* * Conflict; show state as indeterminate. We don't