Skip to content

Commit f6f8509

Browse files
authored
cosmos: Allow the Query Pipeline to return an alternative query to execute in each request (#3166)
For Hybrid Search, as well as ReadMany support, the query engine will need to direct the SDK to issue **multiple different queries**. However, the Rust SDK's query executor expects to be able to replay the original query on each request. This PR changes that expectation to allow (but not require) each Query Request coming from the pipeline to provide an alternate query. When an alternate query is provided, the pipeline can also indicate if the parameters from the original query should be sent. Often, the alternate query does NOT use the parameters, but in some circumstances (hybrid search) it will. There are also some small fixes for doc tests and error checking in the engine-less cross-partition query tests. Not sure why these are coming up here and not in `main`.
1 parent 527499e commit f6f8509

File tree

8 files changed

+395
-67
lines changed

8 files changed

+395
-67
lines changed

sdk/cosmos/azure_data_cosmos/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
- Adjusted the query engine abstraction to support future enhancements and optimizations. ([#3166](https://github.com/Azure/azure-sdk-for-rust/pull/3166))
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl CosmosClient {
110110
/// use azure_core::credentials::Secret;
111111
///
112112
/// let client = CosmosClient::with_connection_string(
113-
/// "AccountEndpoint=https://accountname.documents.azure.com:443/‌​;AccountKey=accountk‌​ey",
113+
/// Secret::from("AccountEndpoint=https://accountname.documents.azure.com:443/‌​;AccountKey=accountk‌​ey"),
114114
/// None)
115115
/// .unwrap();
116116
/// ```

sdk/cosmos/azure_data_cosmos/src/query/engine.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,42 @@ pub struct QueryRequest {
33
/// The ID of the partition key range to query.
44
pub partition_key_range_id: String,
55

6+
/// The index of this request, within the partition key range.
7+
///
8+
/// This value will always increase for subsequent requests for the same partition key range.
9+
/// It must be provided back to the pipeline when providing data, so that the pipeline can ensure that data is provided in order.
10+
pub index: usize,
11+
612
/// The continuation to use, if any.
713
pub continuation: Option<String>,
14+
15+
/// The query to execute for this partition key range, if different from the original query.
16+
pub query: Option<String>,
17+
18+
/// If a query is specified, this flag indicates if the query parameters should be included with that query.
19+
///
20+
/// Sometimes, when an override query is specified, it differs in structure from the original query, and the original parameters are not valid.
21+
pub include_parameters: bool,
22+
23+
/// If specified, indicates that the SDK should IMMEDIATELY drain all remaining results from this partition key range, following continuation tokens, until no more results are available.
24+
/// All the data from this partition key range should be provided BEFORE any new items will be made available.
25+
///
26+
/// This allows engines to optimize for non-streaming scenarios, where the entire result set must be provided to the engine before it can make progress.
27+
pub drain: bool,
828
}
929

1030
/// The request of a single-partition query for a specific partition key range.
1131
pub struct QueryResult<'a> {
32+
/// The ID of the partition key range that was queried.
1233
pub partition_key_range_id: &'a str,
34+
35+
/// The index of the [`QueryRequest`] that generated this result.
36+
pub request_index: usize,
37+
38+
/// The continuation token to be used for the next request, if any.
1339
pub next_continuation: Option<String>,
40+
41+
/// The raw body of the response from the query.
1442
pub result: &'a [u8],
1543
}
1644

@@ -38,7 +66,16 @@ pub trait QueryPipeline: Send {
3866
fn run(&mut self) -> azure_core::Result<PipelineResult>;
3967

4068
/// Provides additional single-partition data to the pipeline.
41-
fn provide_data(&mut self, data: QueryResult) -> azure_core::Result<()>;
69+
///
70+
/// Data from multiple partition ranges may be provided at once.
71+
/// However, each page of data must be provided in order.
72+
/// So, for any given partition key range, page n's results must be earlier in the `data` vector than page n+1's results.
73+
/// Data from different partition key ranges may be interleaved, as long as each partition key range's pages are in order.
74+
///
75+
/// The pipeline will use the [`QueryResult::request_index`] field to validate this.
76+
///
77+
/// When providing data from a draining request (i.e. a request with `drain = true`), all pages for that draining request can share the same [`QueryResult::request_index`].
78+
fn provide_data(&mut self, data: Vec<QueryResult>) -> azure_core::Result<()>;
4279
}
4380

4481
/// Provides an interface to a query engine, which constructs query pipelines.

sdk/cosmos/azure_data_cosmos/src/query/executor.rs

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -128,36 +128,61 @@ impl<T: DeserializeOwned + Send + 'static> QueryExecutor<T> {
128128
}
129129

130130
// No items, so make any requests we need to make and provide them to the pipeline.
131+
// TODO: We can absolutely parallelize these requests.
131132
for request in results.requests {
132-
let mut query_request = base_request.clone();
133+
let mut query_request = if let Some(query) = request.query {
134+
let mut query = Query::from(query);
135+
if request.include_parameters {
136+
query = query.with_parameters_from(&self.query)
137+
}
138+
crate::pipeline::create_base_query_request(
139+
self.http_pipeline.url(&self.items_link),
140+
&query,
141+
)?
142+
} else {
143+
base_request.clone()
144+
};
145+
133146
query_request.insert_header(
134147
constants::PARTITION_KEY_RANGE_ID,
135148
request.partition_key_range_id.clone(),
136149
);
137-
if let Some(continuation) = request.continuation {
138-
query_request.insert_header(constants::CONTINUATION, continuation);
139-
}
140150

141-
let resp = self
142-
.http_pipeline
143-
.send_raw(
144-
self.context.to_borrowed(),
145-
&mut query_request,
146-
self.items_link.clone(),
147-
)
148-
.await?;
149-
150-
let next_continuation =
151-
resp.headers().get_optional_string(&constants::CONTINUATION);
152-
let body = resp.into_body();
153-
154-
let result = QueryResult {
155-
partition_key_range_id: &request.partition_key_range_id,
156-
next_continuation,
157-
result: &body,
158-
};
159-
160-
pipeline.provide_data(result)?;
151+
let mut fetch_more_pages = true;
152+
while fetch_more_pages {
153+
if let Some(c) = request.continuation.clone() {
154+
query_request.insert_header(constants::CONTINUATION, c);
155+
} else {
156+
// Make sure we don't send a continuation header if we don't have one, even if we did on a previous iteration.
157+
query_request.headers_mut().remove(constants::CONTINUATION);
158+
}
159+
160+
let resp = self
161+
.http_pipeline
162+
.send_raw(
163+
self.context.to_borrowed(),
164+
&mut query_request,
165+
self.items_link.clone(),
166+
)
167+
.await?;
168+
169+
let next_continuation =
170+
resp.headers().get_optional_string(&constants::CONTINUATION);
171+
172+
fetch_more_pages = request.drain && next_continuation.is_some();
173+
174+
let body = resp.into_body();
175+
let result = QueryResult {
176+
partition_key_range_id: &request.partition_key_range_id,
177+
request_index: request.index,
178+
next_continuation,
179+
result: &body,
180+
};
181+
182+
// For now, just provide a single result at a time.
183+
// When we parallelize requests, we can more easily provide multiple results at once.
184+
pipeline.provide_data(vec![result])?;
185+
}
161186
}
162187

163188
// No items, but we provided more data (probably), so continue the loop.

sdk/cosmos/azure_data_cosmos/src/query/mod.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,15 @@ impl Query {
108108
Ok(self)
109109
}
110110

111+
/// Replaces all parameters in this [`Query`] instance with the parameters from another [`Query`] instance, and returns it.
112+
///
113+
/// Since the parameters in the other query are already serialized, this method cannot fail.
114+
#[cfg(feature = "preview_query_engine")] // Crate-private for now, and thus only in the preview_query_engine feature (which is the only place it's used).
115+
pub(crate) fn with_parameters_from(mut self, other: &Query) -> Self {
116+
self.parameters = other.parameters.clone();
117+
self
118+
}
119+
111120
/// Consumes this [`Query`] instance, replaces its text with the provided value, and returns it.
112121
pub fn with_text(mut self, text: String) -> Self {
113122
self.text = text;
@@ -248,4 +257,37 @@ mod tests {
248257
assert_eq!(query.parameters.len(), 2);
249258
Ok(())
250259
}
260+
261+
#[test]
262+
#[cfg(feature = "preview_query_engine")]
263+
pub fn with_parameters_from_replaces_all_parameters() -> Result<(), Box<dyn Error>> {
264+
let source_query = Query::from("SELECT * FROM c")
265+
.with_parameter("@id", 42)?
266+
.with_parameter("@name", "Contoso")?;
267+
268+
let target_query = Query::from("SELECT c.value FROM c WHERE c.id = @id AND c.name = @name")
269+
.with_parameter("@old_param", "old_value")?
270+
.with_parameters_from(&source_query);
271+
272+
// Check that the text is preserved from the target query
273+
assert_eq!(
274+
target_query.text,
275+
"SELECT c.value FROM c WHERE c.id = @id AND c.name = @name"
276+
);
277+
278+
// Check that parameters are replaced with those from source query
279+
assert_eq!(target_query.parameters.len(), 2);
280+
assert_eq!(target_query.parameters[0].name, "@id");
281+
assert_eq!(
282+
target_query.parameters[0].value,
283+
serde_json::Value::Number(serde_json::Number::from(42))
284+
);
285+
assert_eq!(target_query.parameters[1].name, "@name");
286+
assert_eq!(
287+
target_query.parameters[1].value,
288+
serde_json::Value::String("Contoso".to_string())
289+
);
290+
291+
Ok(())
292+
}
251293
}

sdk/cosmos/azure_data_cosmos/tests/cosmos_query.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
use std::error::Error;
44

5-
use azure_core::{error::ErrorResponse, http::StatusCode};
5+
use azure_core::http::StatusCode;
66
use azure_core_test::{recorded, TestContext};
7-
use azure_data_cosmos::Query;
7+
use azure_data_cosmos::{constants, Query};
88
use framework::{test_data, MockItem, TestAccount};
99
use futures::TryStreamExt;
1010

@@ -156,15 +156,16 @@ pub async fn cross_partition_query_with_order_by_fails_without_query_engine(
156156
};
157157
assert_eq!(Some(StatusCode::BadRequest), err.http_status());
158158

159-
let error_response = ErrorResponse::try_from(err).expect("expected an HttpResponse error");
160-
let message = error_response
161-
.error
162-
.expect("error should be present")
163-
.message
164-
.expect("message should be present");
165-
assert!(message.starts_with(
166-
"The provided cross partition query can not be directly served by the gateway."
167-
));
159+
let response =
160+
if let azure_core::error::ErrorKind::HttpResponse { raw_response, .. } = err.kind() {
161+
raw_response.as_ref().unwrap().clone()
162+
} else {
163+
panic!("expected an HTTP response error");
164+
};
165+
let sub_status = response.headers().get_optional_str(&constants::SUB_STATUS);
166+
167+
// 1004 = CrossPartitionQueryNotServable
168+
assert_eq!(Some("1004"), sub_status);
168169

169170
account.cleanup().await?;
170171
Ok(())

0 commit comments

Comments
 (0)