Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion components/airtable_oauth/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/airtable_oauth",
"version": "0.5.1",
"version": "0.5.2",
"description": "Pipedream Airtable (OAuth) Components",
"main": "airtable_oauth.app.mjs",
"keywords": [
Expand All @@ -15,7 +15,9 @@
"dependencies": {
"@pipedream/platform": "^3.0.3",
"airtable": "^0.11.1",
"async-retry": "^1.3.3",
"bottleneck": "^2.19.5",
"crypto": "^1.0.1",
"lodash.chunk": "^4.2.0",
"lodash.isempty": "^4.4.0",
"moment": "^2.30.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ export default {
fieldUpdateInfo,
] = Object.entries(fieldObj)[0];

const timestamp = Date.parse(payload.timestamp);
if (this.isDuplicateEvent(fieldId, timestamp)) return;
this._setLastObjectId(fieldId);
this._setLastTimestamp(timestamp);

const updateType = operation === "createdFieldsById"
? "created"
: "updated";
Expand Down
32 changes: 22 additions & 10 deletions components/airtable_oauth/sources/common/common-webhook-record.mjs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import common from "./common-webhook.mjs";
import retry from "async-retry";

export default {
...common,
Expand All @@ -9,6 +10,17 @@ export default {
"tableData",
];
},
withRetries(apiCall, retries = 3) {
return retry(async (bail) => {
try {
return await apiCall();
} catch (err) {
return bail(err);
}
}, {
retries,
});
},
async emitEvent(payload) {
const [
tableId,
Expand Down Expand Up @@ -43,20 +55,20 @@ export default {
recordUpdateInfo,
] = Object.entries(recordObj)[0];

const timestamp = Date.parse(payload.timestamp);
if (this.isDuplicateEvent(recordId, timestamp)) return;
this._setLastObjectId(recordId);
this._setLastTimestamp(timestamp);

let updateType = operation === "createdRecordsById"
? "created"
: "updated";

const { fields } = await this.airtable.getRecord({
baseId: this.baseId,
tableId,
recordId,
});
let fields = {};
try {
({ fields } = await this.withRetries(() => this.airtable.getRecord({
baseId: this.baseId,
tableId,
recordId,
})));
} catch (e) {
fields = {};
}

const summary = `Record ${updateType}: ${fields?.name ?? recordId}`;

Expand Down
75 changes: 51 additions & 24 deletions components/airtable_oauth/sources/common/common-webhook.mjs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { createHmac } from "crypto";
import airtable from "../../airtable_oauth.app.mjs";
import constants from "../common/constants.mjs";

Expand Down Expand Up @@ -48,7 +49,9 @@ export default {
},
hooks: {
async activate() {
const { id } = await this.airtable.createWebhook({
const {
id, macSecretBase64,
} = await this.airtable.createWebhook({
baseId: this.baseId,
data: {
notificationUrl: `${this.http.endpoint}/`,
Expand Down Expand Up @@ -76,6 +79,7 @@ export default {
},
});
this._setHookId(id);
this._setMacSecretBase64(macSecretBase64);
},
async deactivate() {
const webhookId = this._getHookId();
Expand All @@ -94,28 +98,17 @@ export default {
_setHookId(hookId) {
this.db.set("hookId", hookId);
},
_getLastObjectId() {
return this.db.get("lastObjectId");
},
async _setLastObjectId(id) {
this.db.set("lastObjectId", id);
_getMacSecretBase64() {
return this.db.get("macSecretBase64");
},
_getLastTimestamp() {
return this.db.get("lastTimestamp");
_setMacSecretBase64(value) {
this.db.set("macSecretBase64", value);
},
async _setLastTimestamp(ts) {
this.db.set("lastTimestamp", ts);
_setLastCursor(cursor) {
this.db.set("lastCursor", cursor);
},
isDuplicateEvent(id, ts) {
const lastId = this._getLastObjectId();
const lastTs = this._getLastTimestamp();

if (id === lastId && (ts - lastTs < 5000 )) {
console.log("Skipping trigger: another event was emitted for the same object within the last 5 seconds");
return true;
}

return false;
_getLastCursor() {
return this.db.get("lastCursor");
},
getSpecificationOptions() {
throw new Error("getSpecificationOptions is not implemented");
Expand All @@ -135,7 +128,9 @@ export default {
},
emitDefaultEvent(payload) {
const meta = this.generateMeta(payload);
this.$emit(payload, meta);
this.$emit({
originalPayload: payload,
}, meta);
},
async emitEvent(payload) {
// sources may call this to customize event emission, but it is
Expand All @@ -147,30 +142,60 @@ export default {
// and it can be silently ignored when not required
return true;
},
isSignatureValid(signature, bodyRaw) {
const macSecretBase64FromCreate = this._getMacSecretBase64();
const macSecretDecoded = Buffer.from(macSecretBase64FromCreate, "base64");
const body = Buffer.from(bodyRaw, "utf8");
const hmac = createHmac("sha256", macSecretDecoded)
.update(body.toString(), "ascii")
.digest("hex");
const expectedContentHmac = "hmac-sha256=" + hmac;
return signature === expectedContentHmac;
},
payloadFilter() {
return true;
},
},
async run() {
async run({
bodyRaw, headers: { ["x-airtable-content-mac"]: signature },
}) {
const isValid = this.isSignatureValid(signature, bodyRaw);
if (!isValid) {
return this.http.respond({
status: 401,
});
}

this.http.respond({
status: 200,
});
// webhook pings source, we then fetch webhook events to emit
const webhookId = this._getHookId();
let hasMore = false;
const params = {};

try {
await this.saveAdditionalData();
} catch (err) {
console.log("Error fetching additional data, proceeding to event emission");
console.log(err);
}
const params = {
cursor: this._getLastCursor(),
};

do {
const {
cursor, mightHaveMore, payloads,
} = await this.airtable.listWebhookPayloads({
debug: true,
baseId: this.baseId,
webhookId,
params,
});
for (const payload of payloads) {

const filteredPayloads = payloads.filter(this.payloadFilter);

for (const payload of filteredPayloads) {
try {
await this.emitEvent(payload);
} catch (err) {
Expand All @@ -182,5 +207,7 @@ export default {
params.cursor = cursor;
hasMore = mightHaveMore;
} while (hasMore);

this._setLastCursor(params.cursor);
},
};
5 changes: 4 additions & 1 deletion components/airtable_oauth/sources/new-field/new-field.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ export default {
name: "New Field Created (Instant)",
description: "Emit new event when a field is created in the selected table. [See the documentation](https://airtable.com/developers/web/api/get-base-schema)",
key: "airtable_oauth-new-field",
version: "1.0.3",
version: "1.0.4",
type: "source",
dedupe: "unique",
methods: {
...common.methods,
payloadFilter(payload) {
return !!payload.changedTablesById;
},
getChangeTypes() {
return [
"add",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import common from "../common/common-webhook-record.mjs";
import constants from "../common/constants.mjs";
import sampleEmit from "./test-event.mjs";
import airtable from "../../airtable_oauth.app.mjs";

export default {
...common,
name: "New Record Created, Updated or Deleted (Instant)",
description: "Emit new event when a record is added, updated, or deleted in a table or selected view.",
key: "airtable_oauth-new-modified-or-deleted-records-instant",
version: "0.1.3",
version: "0.1.4",
type: "source",
dedupe: "unique",
props: {
Expand All @@ -27,7 +26,7 @@ export default {
},
watchDataInFieldIds: {
propDefinition: [
airtable,
common.props.airtable,
"sortFieldId",
(c) => ({
baseId: c.baseId,
Expand All @@ -42,6 +41,9 @@ export default {
},
methods: {
...common.methods,
payloadFilter(payload) {
return !!payload.changedTablesById;
},
getDataTypes() {
return [
"tableData",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ export default {
name: "New or Modified Field (Instant)",
description: "Emit new event when a field is created or updated in the selected table",
key: "airtable_oauth-new-or-modified-field",
version: "1.0.3",
version: "1.0.4",
type: "source",
dedupe: "unique",
methods: {
...common.methods,
payloadFilter(payload) {
return !!payload.changedTablesById;
},
getChangeTypes() {
return [
"add",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import common from "../common/common-webhook-record.mjs";
import airtable from "../../airtable_oauth.app.mjs";

export default {
...common,
name: "New or Modified Records (Instant)",
key: "airtable_oauth-new-or-modified-records",
description: "Emit new event for each new or modified record in a table or view",
version: "1.0.3",
version: "1.0.4",
type: "source",
dedupe: "unique",
methods: {
...common.methods,
payloadFilter(payload) {
return !!payload.changedTablesById;
},
getChangeTypes() {
return [
"add",
Expand All @@ -22,7 +24,7 @@ export default {
...common.props,
watchDataInFieldIds: {
propDefinition: [
airtable,
common.props.airtable,
"sortFieldId",
(c) => ({
baseId: c.baseId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ export default {
name: "New Record(s) Created (Instant)",
description: "Emit new event for each new record in a table",
key: "airtable_oauth-new-records",
version: "1.0.3",
version: "1.0.4",
type: "source",
dedupe: "unique",
methods: {
...common.methods,
payloadFilter(payload) {
return !!payload.changedTablesById;
},
getChangeTypes() {
return [
"add",
Expand Down
6 changes: 6 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading