Skip to content Skip to sidebar Skip to footer

How To Define Operations Of An Stfp Operator On Airflow?

class SFTPOperation(object): PUT = 'put' GET = 'get' operation=SFTPOperation.GET, NameError: name 'SFTPOperation' is not defined I have defined operators here but I can

Solution 1:

By noticing that the SFTP operator uses ssh_hook to open an sftp transport channel, you should need to provide ssh_hook or ssh_conn_id for file transfer. First, let's see an example providing the parameter ssh_conn_id.

from airflow.providers.sftp.operatorsimport sftp_operator
from airflow importDAGimport datetime

dag = DAG(
'test_dag',
start_date = datetime.datetime(2020,1,8,0,0,0),
schedule_interval = '@daily'
)

put_operation = SFTPOperator(
            task_id="operation",
            ssh_conn_id="ssh_default",
            local_filepath="route_to_local_file",
            remote_filepath="remote_route_to_copy",
            operation="put",
            dag=dag
            )
get_operation = SFTPOperator(....,
            operation = "get",
            dag = dag
            )

put_operation >> get_operation

Notice that the dag should be scheduled as needed by your task, here the example considers a daily schedule starting at noon. Now, If you're providing the SSHhook, the following changes to the above code are necessary

from airflow.contrib.hooks.ssh_hookimportSSHHook
...

put_operation = SFTPOperator(
            task_id="operation",
            ssh_hook=SSHHook("Name_of_variable_defined"),
            ...
            dag=dag
            )
....

where "Name_of_variable_defined" is created in Admin -> Connections at the interface of Airflow.

Post a Comment for "How To Define Operations Of An Stfp Operator On Airflow?"