Skip to content

prefect_fivetran.connectors

Tasks and flows for managing Fivetran connectors

set_fivetran_connector_schedule async

Take connector off Fivetran's schedule so that it can be controlled by Prefect.

Can also be used to place connector back on Fivetran's schedule with schedule_type = "auto".

Parameters:

Name Type Description Default
connector_id str

The id of the Fivetran connector to use in Prefect.

required
fivetran_credentials FivetranCredentials

The credentials to use to authenticate.

required
schedule_type str

Connector syncs periodically on Fivetran's schedule (auto), or whenever called by the API (manual).

'manual'

Returns:

Type Description
Dict

The response from the Fivetran API.

Examples:

Check a Fivetran connector in Prefect

from prefect import flow
from prefect_fivetran import FivetranCredentials
from prefect_fivetran.fivetran import set_fivetran_connector_schedule

@flow
def example_flow():
    fivetran_credentials = FivetranCredentials(
            api_key="my_api_key",
            api_secret="my_api_secret",
    )
    return set_fivetran_connector_schedule(
        connector_id="my_connector_id",
        fivetran_credentials=fivetran_credentials,
        schedule_type="my_schedule_type",
    )

fivetran_sync_flow()

Source code in prefect_fivetran/connectors.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
@task(
    name="Set Fivetran connector schedule",
    description="Sets the schedule for a Fivetran connector.",
    retries=0,
)
async def set_fivetran_connector_schedule(
    connector_id: str,
    fivetran_credentials: FivetranCredentials,
    schedule_type: str = "manual",
) -> Dict:
    """
    Take connector off Fivetran's schedule so that it can be controlled
    by Prefect.

    Can also be used to place connector back on Fivetran's schedule
    with schedule_type = "auto".

    Args:
        connector_id: The id of the Fivetran connector to use in Prefect.
        fivetran_credentials: The credentials to use to authenticate.
        schedule_type: Connector syncs periodically on Fivetran's schedule (auto),
                or whenever called by the API (manual).

    Returns:
        The response from the Fivetran API.

    Examples:
        Check a Fivetran connector in Prefect
        ```python
        from prefect import flow
        from prefect_fivetran import FivetranCredentials
        from prefect_fivetran.fivetran import set_fivetran_connector_schedule

        @flow
        def example_flow():
            fivetran_credentials = FivetranCredentials(
                    api_key="my_api_key",
                    api_secret="my_api_secret",
            )
            return set_fivetran_connector_schedule(
                connector_id="my_connector_id",
                fivetran_credentials=fivetran_credentials,
                schedule_type="my_schedule_type",
            )

        fivetran_sync_flow()
        ```
    """
    if schedule_type not in ["manual", "auto"]:
        raise ValueError('schedule_type must be either "manual" or "auto"')

    async with fivetran_credentials.get_fivetran() as fivetran_client:
        connector_details = (
            await fivetran_client.get_connector(connector_id=connector_id)
        )["data"]

        if connector_details["schedule_type"] != schedule_type:
            resp = await fivetran_client.patch_connector(
                connector_id=connector_id,
                data={"schedule_type": schedule_type},
            )
            return resp

start_fivetran_connector_sync async

Start a Fivetran data sync.

Parameters:

Name Type Description Default
connector_id str

The id of the Fivetran connector to use in Prefect.

required
fivetran_credentials FivetranCredentials

The credentials to use to authenticate.

required

Returns:

Type Description
Dict

The timestamp of the end of the connector's last run, or now if it

Dict

has not yet run.

Examples:

Check a Fivetran connector in Prefect

from prefect import flow
from prefect_fivetran import FivetranCredentials
from prefect_fivetran.fivetran import start_fivetran_connector_sync

@flow
def example_flow():
    fivetran_credentials = FivetranCredentials(
            api_key="my_api_key",
            api_secret="my_api_secret",
    )
    start_fivetran_connector_sync(
        connector_id="my_connector_id",
        fivetran_credentials=fivetran_credentials,
    )

    example_flow()

Source code in prefect_fivetran/connectors.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
@task(
    name="Start Fivetran connector sync",
    description="Starts a Fivetran connector data sync.",
    retries=3,
    retry_delay_seconds=10,
)
async def start_fivetran_connector_sync(
    connector_id: str,
    fivetran_credentials: FivetranCredentials,
) -> Dict:
    """
    Start a Fivetran data sync.

    Args:
        connector_id: The id of the Fivetran connector to use in Prefect.
        fivetran_credentials: The credentials to use to authenticate.

    Returns:
        The timestamp of the end of the connector's last run, or now if it
        has not yet run.

    Examples:
        Check a Fivetran connector in Prefect
        ```python
        from prefect import flow
        from prefect_fivetran import FivetranCredentials
        from prefect_fivetran.fivetran import start_fivetran_connector_sync

        @flow
        def example_flow():
            fivetran_credentials = FivetranCredentials(
                    api_key="my_api_key",
                    api_secret="my_api_secret",
            )
            start_fivetran_connector_sync(
                connector_id="my_connector_id",
                fivetran_credentials=fivetran_credentials,
            )

            example_flow()
        ```
    """
    async with fivetran_credentials.get_fivetran() as fivetran_client:
        connector_details = (
            await fivetran_client.get_connector(connector_id=connector_id)
        )["data"]
        succeeded_at = connector_details["succeeded_at"]
        failed_at = connector_details["failed_at"]

        if connector_details["paused"]:
            await fivetran_client.patch_connector(
                connector_id=connector_id,
                data={"paused": False},
            )

        if succeeded_at is None and failed_at is None:
            succeeded_at = str(pendulum.now())

        last_sync = (
            succeeded_at
            if fivetran_client.parse_timestamp(succeeded_at)
            > fivetran_client.parse_timestamp(failed_at)
            else failed_at
        )
        await fivetran_client.force_connector(connector_id=connector_id)

        return last_sync

trigger_fivetran_connector_sync_and_wait_for_completion async

Flow that triggers a connector sync and waits for the sync to complete.

Parameters:

Name Type Description Default
fivetran_credentials FivetranCredentials

Credentials for authenticating with Fivetran.

required
connector_id str

The ID of the Fivetran connector to trigger.

required
schedule_type str

Connector syncs periodically on Fivetran's schedule ("auto"), or whenever called by the API ("manual").

'manual'
poll_status_every_n_seconds int

Number of seconds to wait in between checks for sync completion.

30

Returns:

Type Description
Dict

Dict containing the timestamp of the end of the connector's run and its ID.

Examples:

Trigger a Fivetran data sync and wait for completion as a stand alone flow:

import asyncio
from prefect_fivetran import FivetranCredentials
from prefect_fivetran.fivetran import trigger_fivetran_connector_sync_and_wait_for_completion

fivetran_credentials = FivetranCredentials(
    api_key="my_api_key",
    api_secret="my_api_secret",
)
asyncio.run(
    trigger_fivetran_connector_sync_and_wait_for_completion(
        fivetran_credentials=fivetran_credentials,
        connector_id="my_connector_id",
        schedule_type="my_schedule_type",
        poll_status_every_n_seconds=30,
    )
)
Trigger a Fivetran connector sync and wait for completion as a sub-flow:
from prefect import flow
from prefect_fivetran import FivetranCredentials
from prefect_fivetran.connectors import fivetran_sync_flow

@flow
def my_flow():
    ...
    fivetran_credentials = FivetranCredentials(
        api_key="my_api_key",
        api_secret="my_api_secret",
    )
    fivetran_result = await trigger_fivetran_connector_sync_and_wait_for_completion(
        fivetran_credentials=fivetran_credentials,
        connector_id="my_connector_id",
        schedule_type="my_schedule_type",
        poll_status_every_n_seconds=30,
    )
    ...

my_flow()

Source code in prefect_fivetran/connectors.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
@flow(
    name="Trigger Fivetran connector sync and wait for completion",
    description="Triggers a Fivetran connector to move data and waits for the"
    "connector to complete.",
)
async def trigger_fivetran_connector_sync_and_wait_for_completion(
    connector_id: str,
    fivetran_credentials: FivetranCredentials,
    schedule_type: str = "manual",
    poll_status_every_n_seconds: int = 30,
) -> Dict:
    """
    Flow that triggers a connector sync and waits for the sync to complete.

    Args:
        fivetran_credentials: Credentials for authenticating with Fivetran.
        connector_id: The ID of the Fivetran connector to trigger.
        schedule_type: Connector syncs periodically on Fivetran's schedule ("auto"),
                or whenever called by the API ("manual").
        poll_status_every_n_seconds: Number of seconds to wait in between checks for
            sync completion.

    Returns:
        Dict containing the timestamp of the end of the connector's run and its ID.

    Examples:
        Trigger a Fivetran data sync and wait for completion as a stand alone flow:
        ```python
        import asyncio
        from prefect_fivetran import FivetranCredentials
        from prefect_fivetran.fivetran import trigger_fivetran_connector_sync_and_wait_for_completion

        fivetran_credentials = FivetranCredentials(
            api_key="my_api_key",
            api_secret="my_api_secret",
        )
        asyncio.run(
            trigger_fivetran_connector_sync_and_wait_for_completion(
                fivetran_credentials=fivetran_credentials,
                connector_id="my_connector_id",
                schedule_type="my_schedule_type",
                poll_status_every_n_seconds=30,
            )
        )
        ```
        Trigger a Fivetran connector sync and wait for completion as a sub-flow:
        ```python
        from prefect import flow
        from prefect_fivetran import FivetranCredentials
        from prefect_fivetran.connectors import fivetran_sync_flow

        @flow
        def my_flow():
            ...
            fivetran_credentials = FivetranCredentials(
                api_key="my_api_key",
                api_secret="my_api_secret",
            )
            fivetran_result = await trigger_fivetran_connector_sync_and_wait_for_completion(
                fivetran_credentials=fivetran_credentials,
                connector_id="my_connector_id",
                schedule_type="my_schedule_type",
                poll_status_every_n_seconds=30,
            )
            ...

        my_flow()
        ```
    """  # noqa
    if await verify_fivetran_connector_status(
        connector_id=connector_id,
        fivetran_credentials=fivetran_credentials,
    ):
        await set_fivetran_connector_schedule(
            connector_id=connector_id,
            fivetran_credentials=fivetran_credentials,
            schedule_type=schedule_type,
        )
        last_sync = await start_fivetran_connector_sync(
            connector_id=connector_id,
            fivetran_credentials=fivetran_credentials,
        )
    return await wait_for_fivetran_connector_sync(
        connector_id=connector_id,
        fivetran_credentials=fivetran_credentials,
        previous_completed_at=last_sync,
        poll_status_every_n_seconds=poll_status_every_n_seconds,
    )

verify_and_start_fivetran_connector_sync async

Flow that triggers a connector sync.

Ensures that Fivetran connector is correctly configured and disables connector schedule so that Prefect schedule can be used instead.

Parameters:

Name Type Description Default
fivetran_credentials FivetranCredentials

Credentials for authenticating with Fivetran.

required
connector_id str

The ID of the Fivetran connector to trigger.

required
schedule_type str

Connector syncs periodically on Fivetran's schedule ("auto"), or whenever called by the API ("manual").

'manual'

Returns:

Type Description
Dict

The timestamp of the end of the connector's last run, or now if it

Dict

has not yet run.

Examples:

Trigger a Fivetran data sync:

import asyncio
from prefect_fivetran import FivetranCredentials
from prefect_fivetran.fivetran import verify_and_start_fivetran_connector_sync

fivetran_credentials = FivetranCredentials(
    api_key="my_api_key",
    api_secret="my_api_secret",
)
asyncio.run(
    verify_and_start_fivetran_connector_sync(
        fivetran_credentials=fivetran_credentials,
        connector_id="my_connector_id",
        schedule_type="my_schedule_type",
    )
)
Trigger a Fivetran connector sync as a sub-flow:
from prefect import flow
from prefect_fivetran import FivetranCredentials
from prefect_fivetran.fivetran import start_fivetran_sync
@flow
def my_flow():
    ...
    fivetran_credentials = FivetranCredentials(
        api_key="my_api_key",
        api_secret="my_api_secret",
    )
    last_sync = await verify_and_start_fivetran_connector_sync(
        fivetran_credentials=fivetran_credentials,
        connector_id="my_connector_id",
        schedule_type="my_schedule_type"
    )
    ...
my_flow()

Source code in prefect_fivetran/connectors.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
@flow(
    name="Trigger Fivetran connector data sync",
    description="Starts a Fivetran data connector",
    retries=3,
    retry_delay_seconds=10,
)
async def verify_and_start_fivetran_connector_sync(
    connector_id: str,
    fivetran_credentials: FivetranCredentials,
    schedule_type: str = "manual",
) -> Dict:
    """
    Flow that triggers a connector sync.

    Ensures that Fivetran connector is correctly configured and disables connector
    schedule so that Prefect schedule can be used instead.

    Args:
        fivetran_credentials: Credentials for authenticating with Fivetran.
        connector_id: The ID of the Fivetran connector to trigger.
        schedule_type: Connector syncs periodically on Fivetran's schedule ("auto"),
                or whenever called by the API ("manual").

    Returns:
        The timestamp of the end of the connector's last run, or now if it
        has not yet run.

    Examples:
        Trigger a Fivetran data sync:
        ```python
        import asyncio
        from prefect_fivetran import FivetranCredentials
        from prefect_fivetran.fivetran import verify_and_start_fivetran_connector_sync

        fivetran_credentials = FivetranCredentials(
            api_key="my_api_key",
            api_secret="my_api_secret",
        )
        asyncio.run(
            verify_and_start_fivetran_connector_sync(
                fivetran_credentials=fivetran_credentials,
                connector_id="my_connector_id",
                schedule_type="my_schedule_type",
            )
        )
        ```
        Trigger a Fivetran connector sync as a sub-flow:
        ```python
        from prefect import flow
        from prefect_fivetran import FivetranCredentials
        from prefect_fivetran.fivetran import start_fivetran_sync
        @flow
        def my_flow():
            ...
            fivetran_credentials = FivetranCredentials(
                api_key="my_api_key",
                api_secret="my_api_secret",
            )
            last_sync = await verify_and_start_fivetran_connector_sync(
                fivetran_credentials=fivetran_credentials,
                connector_id="my_connector_id",
                schedule_type="my_schedule_type"
            )
            ...
        my_flow()
        ```
    """
    if verify_fivetran_connector_status(
        connector_id=connector_id,
        fivetran_credentials=fivetran_credentials,
    ):
        await set_fivetran_connector_schedule(
            connector_id=connector_id,
            fivetran_credentials=fivetran_credentials,
            schedule_type=schedule_type,
        )
        return await start_fivetran_connector_sync(
            connector_id=connector_id,
            fivetran_credentials=fivetran_credentials,
        )

verify_fivetran_connector_status async

Ensure that Fivetran connector is ready to sync data.

Parameters:

Name Type Description Default
connector_id str

The id of the Fivetran connector to use in Prefect.

required
fivetran_credentials FivetranCredentials

The credentials to use to authenticate.

required

Returns:

Type Description
Dict

The response from the Fivetran API.

Examples:

Check a Fivetran connector in Prefect

from prefect import flow
from prefect_fivetran import FivetranCredentials
from prefect_fivetran.fivetran import verify_fivetran_connector_status

@flow
def example_flow():
    fivetran_credentials = FivetranCredentials(
            api_key="my_api_key",
            api_secret="my_api_secret",
    )
    return verify_fivetran_connector_status(
        connector_id="my_connector_id",
        fivetran_credentials=fivetran_credentials,
    )

example_flow()

Source code in prefect_fivetran/connectors.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@task(
    name="Verify Fivetran connector status",
    description="Checks that a Fivetran connector is ready to sync data.",
    retries=0,
)
async def verify_fivetran_connector_status(
    connector_id: str,
    fivetran_credentials: FivetranCredentials,
) -> Dict:
    """
    Ensure that Fivetran connector is ready to sync data.

    Args:
        connector_id: The id of the Fivetran connector to use in Prefect.
        fivetran_credentials: The credentials to use to authenticate.

    Returns:
        The response from the Fivetran API.

    Examples:
        Check a Fivetran connector in Prefect
        ```python
        from prefect import flow
        from prefect_fivetran import FivetranCredentials
        from prefect_fivetran.fivetran import verify_fivetran_connector_status

        @flow
        def example_flow():
            fivetran_credentials = FivetranCredentials(
                    api_key="my_api_key",
                    api_secret="my_api_secret",
            )
            return verify_fivetran_connector_status(
                connector_id="my_connector_id",
                fivetran_credentials=fivetran_credentials,
            )

        example_flow()
        ```
    """
    if not connector_id:
        raise ValueError("Value for parameter `connector_id` must be provided.")
    # Make sure connector configuration has been completed successfully
    # and is not broken.
    async with fivetran_credentials.get_fivetran() as fivetran_client:
        connector_details = (
            await fivetran_client.get_connector(connector_id=connector_id)
        )["data"]
        URL_SETUP = "https://fivetran.com/dashboard/connectors/{}/{}/setup".format(
            connector_details["service"], connector_details["schema"]
        )
        setup_state = connector_details["status"]["setup_state"]
        if setup_state != "connected":
            EXC_SETUP: str = (
                'Fivetran connector "{}" not correctly configured, status: {}; '
                + "please complete setup at {}"
            )
            raise ValueError(EXC_SETUP.format(connector_id, setup_state, URL_SETUP))

        return connector_details

wait_for_fivetran_connector_sync async

Wait for the previously started Fivetran connector to finish.

Parameters:

Name Type Description Default
connector_id str

ID of the Fivetran connector with which to interact.

required
previous_completed_at str

Time of the end of the connector's last run

required
poll_status_every_n_seconds int

Frequency in which Prefect will check status of Fivetran connector's sync completion

15

Returns:

Type Description
Dict

Dict containing the timestamp of the end of the connector's run and its ID.

Examples:

Run and finish a Fivetran connector in Prefect

from prefect import flow
from prefect_fivetran import FivetranCredentials
from prefect_fivetran.fivetran import (
    start_fivetran_sync,
    wait_for_fivetran_connector_sync,
)

@flow
def example_flow():
    fivetran_credentials = FivetranCredentials(
        api_key="my_api_key",
        api_secret="my_api_secret",
    )

    last_sync = start_fivetran_connector_sync(
        connector_id="my_connector_id",
        fivetran_credentials=fivetran_credentials,
    )

    return wait_for_fivetran_connector_sync(
        connector_id="my_connector_id",
        fivetran_credentials=fivetran_credentials,
        previous_completed_at=last_sync,
        poll_status_every_n_seconds=60,
    )

example_flow()

Source code in prefect_fivetran/connectors.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
@task(
    name="Wait on a Fivetran connector data sync",
    description="Halts execution of flow until Fivetran connector data sync completes",
    retries=3,
    retry_delay_seconds=10,
)
async def wait_for_fivetran_connector_sync(
    connector_id: str,
    fivetran_credentials: FivetranCredentials,
    previous_completed_at: str,
    poll_status_every_n_seconds: int = 15,
) -> Dict:
    """
    Wait for the previously started Fivetran connector to finish.

    Args:
        connector_id: ID of the Fivetran connector with which to interact.
        previous_completed_at: Time of the end of the connector's last run
        poll_status_every_n_seconds: Frequency in which Prefect will check status of
            Fivetran connector's sync completion

    Returns:
        Dict containing the timestamp of the end of the connector's run and its ID.

    Examples:
        Run and finish a Fivetran connector in Prefect
        ```python
        from prefect import flow
        from prefect_fivetran import FivetranCredentials
        from prefect_fivetran.fivetran import (
            start_fivetran_sync,
            wait_for_fivetran_connector_sync,
        )

        @flow
        def example_flow():
            fivetran_credentials = FivetranCredentials(
                api_key="my_api_key",
                api_secret="my_api_secret",
            )

            last_sync = start_fivetran_connector_sync(
                connector_id="my_connector_id",
                fivetran_credentials=fivetran_credentials,
            )

            return wait_for_fivetran_connector_sync(
                connector_id="my_connector_id",
                fivetran_credentials=fivetran_credentials,
                previous_completed_at=last_sync,
                poll_status_every_n_seconds=60,
            )

        example_flow()
        ```
    """
    logger = get_run_logger()
    loop: bool = True
    async with fivetran_credentials.get_fivetran() as fivetran_client:
        while loop:
            current_details = (
                await fivetran_client.get_connector(connector_id=connector_id)
            )["data"]
            succeeded_at = fivetran_client.parse_timestamp(
                current_details["succeeded_at"]
            )
            failed_at = fivetran_client.parse_timestamp(current_details["failed_at"])
            current_completed_at = (
                succeeded_at if succeeded_at > failed_at else failed_at
            )
            # The only way to tell if a sync failed is to check if its latest failed_at
            # value is greater than then last known "sync completed at" value.
            if failed_at > fivetran_client.parse_timestamp(previous_completed_at):
                raise ValueError(
                    f'Fivetran sync for connector "{connector_id}" failed. '
                    f'Please see logs at https://fivetran.com/dashboard/connectors/{current_details["service"]}/{current_details["schema"]}/logs'  # noqa
                )
            # Started sync will spend some time in the 'scheduled' state before
            # transitioning to 'syncing'.
            # Capture the transition from 'scheduled' to 'syncing' or 'rescheduled',
            # and then back to 'scheduled' on completion.
            sync_state = current_details["status"]["sync_state"]
            logger.info(
                'Connector "{}" current sync_state = {}'.format(
                    connector_id, sync_state
                )
            )

            if current_completed_at > fivetran_client.parse_timestamp(
                previous_completed_at
            ):
                loop = False
            else:
                await asyncio.sleep(poll_status_every_n_seconds)
        return {
            "succeeded_at": succeeded_at.to_iso8601_string(),
            "connector_id": connector_id,
        }