Skip to content
Menu
Open World News Open World News
  • Privacy Policy
Open World News Open World News

Category: News

Pierce Lamb: Custom Workflow Orchestration in Python

Posted on August 11, 2022 by Michael G
Pierce Lamb: Custom Workflow Orchestration in Python

(I put Custom Workflow Orchestration In Python in an Art Generator and the above popped out)

This post originally appeared on my employer VISO Trust’s blog. It is lightly edited and reproduced here.

On the Data & Machine Learning team at VISO Trust, one of our core goals is to provide Document Intelligence to the audit team. Every Document that passes through the system is subject to collection, parsing, reformatting, analysis, reporting and more. Every day, we work to expand this feature set, increase its accuracy and deliver faster results.

Why we needed workflow orchestration

There are many individual tasks executed which eventually result in what’s provided by Document Intelligence, including but not limited to:

  • Security Control Language Detections
  • Audit Framework Control ID Detections
  • Named Entity Extraction like organizations, dates and more
  • Decryption of encrypted pdfs
  • Translation of foreign language pdfs
  • Document Classification
  • Document Section Detection

Until our workflow orchestration implementation, the features listed above and more were all represented in code inside a single function. Over time, this function became unwieldy and difficult to read; snippets of ceremony, controls, logging, function calls and more sprinkled throughout. Moreover, this is one of the most important areas of our app where new features will be implemented regularly. So the need to clean this code up and make it easier to reason about became clear. Furthermore, execution inside this function occurred sequentially despite the fact that some of its function calls could occur in parallel. While in its current state, parallel execution isn’t required, we knew that in the near future, features in the roadmap would necessitate it. With these two requirements:

  • task execution that is easier to reason about and
  • the ability to execute in parallel

We knew we needed to either use an existing workflow orchestration tool or write it custom. We began with some rough analysis of what was going on in our main automation function, namely, we formalized each ‘step’ into a concept called Task and theorized on which Task’s could execute in parallel. At the time of the analysis, we had 11 ‘Tasks’ each of which required certain inputs and produced certain outputs; based on these inputs and outputs, we determined that a number could run in parallel. With this context, we reviewed some of the major open source python toolkits for workflow orchestration:

  • Apache Airflow
  • Luigi

Both of these toolkits are designed for managing workflows that have tens, hundreds up to thousands of tasks to complete and can take days or weeks to finish. They have complex schedulers, user interfaces, failure modes, options for a variety of input and output modes and more. Our pipeline will reach this level of complexity someday, but with an 11 Task pipeline, we decided that these toolkits added too much complexity for our use. We resolved to build a custom workflow orchestration toolkit guided by the deep knowledge in these more advanced tools.

Our custom workflow orchestration

The first goal was to generalize all of the steps in our automation service into the concept of a Task. A few examples of a Task would be:

  • detecting a document’s language,
  • translating a foreign language document,
  • processing OCR results into raw text,
  • detecting keywords inside text,
  • running machine learning inference on text.

Just reading this list gives one a feel for how each Task is dependent on a previous Task’s output to run. Being explicit about dependencies is core to workflow orchestration, so the first step in our Task concept was defining what inputs a given Task requires and what outputs it will produce. To demonstrate Task’s, we will develop a fake example Task called DocClassifyInference, the goal of which is to run ML inference to classify a given document. Imagine that our model uses both images of the raw pdf file and the text inside it to make predictions. Our Task, then, will require the decrypted PDF and the paginated text of the pdf in order to execute. Further, when it’s complete it will write a file to S3 containing its results. Thus, the start of our example Task might look like:

https://medium.com/media/094d252043626f462dee2692b54f2b29/href

DocClassifyInference subclasses S3Task, an abstract class that enforces defining a method to write to s3. S3Task itself is a subclass of the Task class which enforces that subclasses define input keys, output keys and an execute method. The keys are enforced in a Pipeline class:

https://medium.com/media/767e8d6e412c0ec6b472df79028c536d/href

This Pipeline will become the object that manages state as our Tasks execute. In our case we were not approaching memory limits so we decided to keep much of the Task state in-memory though this could easily be changed to always write to and read from storage. As a state manager, the Pipeline can also capture ceremony prior to executing any Tasks that downstream Tasks may require.

Continuing on with DocClassifyInference, as a subclass of the abstract class Task, DocClassifyInference will have to implement def execute as well (enforced by Task). This method will take a Pipeline and return a Pipeline. In essence, it receives the state manager, modifies the state and returns it so the next Task can operate on it. In our example case, execute will extract the decrypted pdf and paginated text so they can be used as inputs for a ML model to perform document classification. Let’s look at the entire stubbed out DocClassifyInference:

https://medium.com/media/e70499b33e3aa2979d5713f967406298/href

It’s easy to see how DocClassifyInference gets the Pipeline state, extracts what it needs, operates on that data, sets what it has declared it’s going to set and returns the Pipeline. This allows for an API like this:

https://medium.com/media/c6c6331180e2c768bebe8b6d3d93e156/href

Which of course was much cleaner than what we had previously. It also lends itself to writing easy, understandable unit tests per Task as well as adhering more closely to functional programming principles. So this solves our first goal of making the code cleaner and more easy to reason about. What about parallel processing?

Parallel Processing

Similar to Luigi and Apache Airflow, the goal of our workflow orchestration is to generate a topologically sorted Directed Acyclic Graph of Tasks. In short, having each Task explicitly define its required inputs and intended output allows the Tasks to be sorted for optimal execution. We no longer need to write the Tasks down in sequential order like the API described above, rather we can pass a Task Planner a list of Tasks and it can decide how to optimally execute them. What we’ll want then is a Task Planner that is passed a List of Tasks, sorts the Tasks topologically and returns a list where each member is a list that contains Tasks. Let’s take a look at what this might look like using some of our examples from above:

https://medium.com/media/0a663e1d21fad19f6742fcb8626491c2/href

Here I have retained our examples while adding two new Tasks: KeywordDetection and CreateCSVOutput. You can imagine these like matching keywords in the paginated text and modifying the results of RunDocInference & KeywordDetection to create a formatted CSVOutput. When the Task Planner receives this list, we’ll want it to topologically sort the tasks and output a data structure that looks like this:

https://medium.com/media/1f13a65d01989925558a170c8f56b694/href

In the above List, you can imagine each of its members is a ‘stage’ of execution. Each stage has one-to-many Tasks; in the case of one, execution occurs sequentially and in the case of many, execution occurs in parallel. In english, the expected_task_plan can described like so:

  • DecryptPDF depends on nothing and creates a consumable PDF,
  • PaginatedText depends on a consumable PDF and creates a list of strings
    – RunDocInference depends on both and classifies the document
    – KeywordDetection depends on paginated text and produces matches
  • CreateCSVOutput depends on doc classification and keyword detection and produces a formatted CSV of their outputs.

An example of the function that creates the expected_task_plan above might look like:

https://medium.com/media/3c129c3f6c9c4e794bec10497bf706a7/href

This function gets the list of Tasks, ensures that no two Task outputs have identical keys, adds the nodes to a sorter by interrogating the Task input_keys and output_keys and sorts them topologically. In our case the sorter comes from graphlib’s TopologicalSorter which is described here. Getting into what each of these functions are doing would take us too far afield so we will move on to executing a task plan.

With the expected_task_plan shown above, an execute_task_plan() function is straightforward:

https://medium.com/media/b734c2dfa055816d3da58282a3a85802/href

Here we iterate over the task list deciding between sequential execution or parallel execution. In the latter case, we utilize python’s threading.Thread library to create a thread per task and use idiomatic methods for starting and joining threads. Wait, then what is TaskThread?

In our case, we wanted to ensure that an exception in a child thread will always be raised to the calling thread so the calling thread can exit immediately. So we extended the threading.Thread class with our own class called TaskThread. Overriding threading.Thread’s .run() method is fairly common (so common that it’s suggested in run()’s comments); we overrode run() to set an instance variable carrying an exception’s content and then we check that variable at .join() time.

https://medium.com/media/32dad77b5be2c90b1dc3e9ef857987cc/href

The calling thread can now try/except at .join() time.

Conclusion

With these structures in place, the file containing the automation service’s primary functions was reduced from ~500 lines to ~90. Now when we create our threadpool to consume SQS messages, we get the Task plan like so task_plan = get_task_plan() and pass the task_plan into each thread. Once execution reaches the main function for performing document intelligence, what previously was a large section of difficult-to-read code now becomes:

https://medium.com/media/86e2f5c586197e6dd85c8f3c16d22409/href

The introduction of parallel processing of these Task’s shaved consistent time off of performing document intelligence (an average of about a minute). The real benefit of this change, however, will come in the future as we add more and more Tasks to the pipeline that can be processed in parallel.

While we’ve reduced the time-to-audit significantly from the former state-of-the-art, we are definitely not done. Features like the above will enable us to continue reducing this time while maintaining consistent processing times. We hope this blog helps you in your workflow orchestration research.

From byebug to ruby/debug

Posted on August 11, 2022 by Michael G
Switching to a new debugger and potentially changing your debugging process could be scary. So I hope this side-by-side comparison of byebug and ruby/debug can help you get familiar with ruby/debug and also make the transition smoother.

/usr/games removed from the default $PATH

Posted on August 11, 2022 by Michael G

In -current, /usr/games
has been removed from the default
$PATH.
Theo Buehler (tb@)
committed the change:

CVSROOT:	/cvs
Module name:	src
Changes by:	tb@cvs.openbsd.org	2022/08/10 01:40:37

Modified files:
	etc/skel       : dot.cshrc dot.profile 

Log message:
Remove games from the default $PATH in /etc/skel

The games are a playground for developers. Their code is very old and full
of bugs.

ok deraadt kn

So when you next sit down on a fresh snapshot install and want to do a quick rot13 or do a round of tetris, you may need to specify the full path.

Alternatively, you could dig into the code and see if you can fix a bug or two.

Adopting Sigstore Incrementally

Posted on August 11, 2022 by Michael G

Adopting Sigstore IncrementallyDevelopers, package maintainers, and enterprises that would like to adopt Sigstore may already sign published artifacts. This tells you how to adopt it incrementally.
The post Adopting Sigstore Incrementally appeared first on Linux Foundation.

The post Adopting Sigstore Incrementally appeared first on Linux.com.

Why I joined Mozilla’s Board of Directors

Posted on August 11, 2022 by Michael G

I first started working with digitalization and the internet when I became CEO of Scandinavia Online in 1998. It was the leading online service in the Nordics and we were pioneers and idealists. I learnt a lot from that experience: the endless opportunities, the tricky business models and the extreme ups and downs in hypes […]

The post Why I joined Mozilla’s Board of Directors appeared first on The Mozilla Blog.

The many derivatives of the CP/M operating system

Posted on August 11, 2022 by Michael G
But still, CP/M was, for a while, the industry-standard microcomputer OS, making Digital Research a powerful and important company. Wealthy companies that lose dominance over a market they formerly controlled don’t tend to just give up. Digital Research put a substantial R&D effort into expanding and enhancing CP/M, creating a large family of OSes. It had some significant wins and big sales. Some of those products are still in use. All those products are arguably “CP/M derivatives”, and as such, Bryan Sparks’ 2001 edict might have just open-sourced them all. One of the many giants we lost along the way.

Human rights in focus on the Czech and Serbian Wikipedia

Posted on August 10, 2022 by Michael G
The diversity of the community as well as inviting new editors from various backgrounds became a prime mover to focus on topics related to the…

Specbee: Marketo Webhook Integration with Drupal: Sync Lead Data from Marketo to Drupal in Real-Time

Posted on August 10, 2022 by Michael G
Marketo Webhook Integration with Drupal: Sync Lead Data from Marketo to Drupal in Real-Time
Shefali Shetty
09 Aug, 2022

When the Association of National Advertisers (ANA) names “Personalization” the “marketing word of the year”, you can probably feel comfortable that it is a strategy that’s here to stay. Personalized content adds a human touch to the customer experience, something that is priceless throughout their journey. This is proven by stats that suggest 90% of consumers find personalized content more appealing and get annoyed when it is not. 

Marketing automation software giant, Marketo, helps B2B and B2C organizations engage and nurture potential leads while enabling marketers to create personalized marketing campaigns around them. 

Combining the power of Marketo with a content management system like Drupal is one of the best ways to present a completely seamless digital experience to customers. 

With Drupal – Marketo integration modules like the Marketo MA, you can automate lead capturing, tracking, nurturing, personalization, analytics, and much more. Now your Drupal website is also connected to different third-party services that will often need updated lead data from Marketo. Enter, Webhooks. In one of our recent projects, we used Webhooks to get real-time data from Marketo so that the content can be more personalized to the customer when they log in. Read more to find out about the Drupal – Marketo integration and how to configure a Webhook to synchronize Marketo data with Drupal in real-time.

Setting up Marketo in Drupal

Specbee: Marketo Webhook Integration with Drupal: Sync Lead Data from Marketo to Drupal in Real-Time

Before moving on with setting up the Drupal – Marketo integration, note that this process assumes that you already have set up your Marketo account and you know how the platform works.

Installing the Marketo MA Drupal Module

On your Drupal admin setup, let’s go ahead and install the Marketo MA module from here. Next, go to Extend and enable the following modules (as shown in the below screengrab):

  • Marketo MA User
  • Marketo MA Webform
  • Marketo MA

Install

API Configuration

Now, let’s activate your Marketo integration by entering the Marketo account ID and other lead capturing details. Here, we are going to use the REST API method to track lead data instead of Munchkin JavaScript API. So, go ahead and enter the REST API configuration settings like the Client ID and the Client Secret.

MA Configuration

Field Definition

Here’s where you configure and map your user and Webform fields to the fields defined in your Marketo account (as shown in the below screengrab).

Field Definition

 

User Settings

In this section, you can enable a trigger to update the lead in Marketo during events like a User login, registration/creation, and an update to the user profile. You can also choose the User fields that should trigger the update and map it to the Marketo field.

User Setting

 

Adding the Webform Handler

Now select the Marketo MA webform handler to make sure that the lead is captured via webforms and sent to Marketo.

Handler

 

This setup will now let you add lead capturing, tracking, and nurturing capabilities on your Drupal site. You are now ready to send leads from Drupal to your Marketo platform.

How to Configure a Webhook to get Updated Lead Data from Marketo to Drupal

Your leads can come in from different sources. Several of your leads come in through your website’s webform, while others may be entered directly into Marketo’s dashboard through different marketing channels. 

Sometimes, the user data that is captured and sent over from your Drupal site might get updated on the Marketo dashboard. What happens when you need real-time updated data from Marketo to personalize Drupal content for that user?

The Use Case

Recently, our client’s Drupal website required us to create a Webhook for their content personalization needs. They have a single sign-on system where their users can log in once and can access multiple site areas like events, member login, and shopping. Now after logging in, the content is personalized on the Drupal website based on content segmentations like demographics, job levels, etc. This needed our Drupal site to have updated user data that is synchronized in real-time with their Marketo system.

One not-very-feasible solution would be to make an API call to fetch get lead data from Marketo on user sign-in. However, not only is this method going to slow down the process, but it also proves more expensive as API requests are billed.

The Solution – Webhooks

Webhooks are basically API requests that are triggered by specific events. Marketo lets you register webhooks to connect to different third-party applications. For this use case, we configured a webhook to get real-time data from Marketo into the Drupal website. Let’s dive into the steps taken to implement webhooks for the Drupal Marketo integration.

Step 1: Create a custom module and define a route for API

First, you need to enable the HTTP Basic Authentication module in your Drupal setup.

marketo_webhook.routing.yml

marketo_webhook.webhook:
  path: '/webhooks/marketo'
  options:
    _auth: [ 'basic_auth' ]
  requirements:
    _user_is_logged_in: 'TRUE'
  defaults:
    _controller: 'Drupalmarketo_webhookControllerMarketoWebhookController::getMarketoLeads'
  methods: [POST]

Step 2: Create a controller for the API and store the data in custom fields

entityTypeManager = $entityTypeManager;
 }
 /**
  * {@inheritdoc}
  */
 public static function create(ContainerInterface $container) {
   return new static(
     $container->get('entity_type.manager')
   );
 }
 /**
  * Update user marketo fields.
  */
 public function getMarketoLeads(Request $request) {
   $payload = json_decode($request->getContent(), TRUE);
   $payload_log = implode(',', $payload);
   Drupal::logger('marketo_webhook')->notice($payload_log);
   if($payload){
     if($payload['mail']){
       $users = $this->entityTypeManager->getStorage('user')
         ->loadByProperties(['mail' => $payload['mail']]);
       $user = reset($users);
       if ($user) {
         if($payload['field_job_function'] != 'NA'){
           $user->set('field_job_function',$payload['field_job_function']);
         }
         $user->save();
         return JsonResponse::create('Success', 200);
       }
     }
   }
   return JsonResponse::create('Success', 400);
 }
}

Step 3: Create the Webhook and Marketo Integration

But first, you will need to register the Webhook. To register the Webhook on Marketo, let’s first hop on to our Marketo dashboard and click on the Webhooks option under the Admin >> Integration menu (as shown in the below screengrab).

Webhooks

 

Next, create a New Webhook which will open up a dialog box where you can enter details like the Webhook Name, Description, URL, Request Type, Template, etc.

New webhook

 

Give a name to the Webhook and an easily understandable description. Enter the URL to submit the web service request.

For example, here:
https://www.specbee.com/webhooks/marketo is the API endpoint for our webhook
Add the Drupal username and password for basic authentication like mentioned below:
https://username:password@www.specbee.com/webhooks/marketo

Click on the Insert Token button next to Template to add fields of the Marketo object that you want to pass with the request.
For example: “field_job_function”: “{{lead.Job Function:default=NA}}”. Set default value to any key of your choice. ‘NA’ in our case. This will return NA if there is no data.

Name

 

Step 4: Create a Smart Campaign

To create the Webhook Marketo Integration, you will now need to set up a Smart Campaign. You can define your own Smart campaigns in Marketo that will run Marketo programs like calling a Webhook, sending out emails after a certain event, etc. A Smart campaign configuration has three parts: Smart List, Flow, and Schedule. You will need to add the trigger to the Webhook under Smart List.

  • Under Marketing Activities and within your program, create a new Smart campaign. 
  • Give the Smart Campaign a name and a description. Here we have called it Drupal Integration.
  • Under Smart List, you will find all available Triggers. Drag and drop the triggers you need into the Smart List. Here, we have selected the Person is Created trigger but this will trigger only when a new lead is created. To solve this, let’s go ahead and add another trigger for Data value changes so that it gets fired when there is an update in the lead data.
  • We have selected the Job Function and Job Level attributes under Person to trigger the webhook (as shown in the below screengrab).

Person

 

  • Now, time to call the Webhook. Click on Flow and select the Call Webhook flow action on the right pane and drag it to the Flow. Select the name of the Webhook you created.

Call Webhook

 

  • Now that you have created the campaign to call the Webhook, let’s schedule it.

Schedule

 

  • In the Smart Campaign Settings click on the Edit button to set how often you want the campaign to run. For our use case, we have selected “every time” as we wanted the webhook to fire up every time a lead data gets updated. Save that setting and click on ACTIVATE.

Qualification Rules

 

Step 5: Test it out!

Your campaign is now ready to test. You will be able to see all the activities, that is, the number of calls to the Webhook and other details under the Results tab of the Smart campaign.

So ideally, when you create a new lead (Person) or update the Job level or Job Function field of an existing lead, it should call the Webhook and get the lead updated in your Drupal website’s database as well.

This article would not have been possible without Prashanth’s help! Thank you!

Conclusion

Marketing automation platforms like Marketo can be a valuable addition to any organization’s marketing strategy to help engage, nurture, and ultimately convert leads. The use of Drupal as a content management system streamlines these activities. In this article, along with showing you how to integrate Marketo with Drupal, we have also covered how to configure webhooks that can let you get updated lead data from Marketo to Drupal. Need help customizing Drupal integrations with Marketo or any other third-party application? We’d be happy to help!

Author: Shefali Shetty

​​Meet Shefali Shetty, Director of Marketing at Specbee. An enthusiast for Drupal, she enjoys exploring and writing about the powerhouse. While not working or actively contributing back to the Drupal project, you can find her watching YouTube videos trying to learn to play the Ukulele 🙂

Drupal
Drupal Development
Drupal Integration
Drupal 9
Drupal Planet
Drupal Module

 

Recent Blogs

Image
Marketo Teaser

Marketo Webhook Integration with Drupal: Sync Lead Data from Marketo to Drupal in Real-Time

Image
Acquia site studio

Building component-based websites on Drupal using Acquia Site Studio

Image
Responsive Images in Drupal 9 Teaser

Setting up Responsive Images in Drupal 9 – A Step-by-Step Guide

Want to extract the maximum out of Drupal?
TALK TO US

Featured Success Stories


Physician Insurance

Upgrading and consolidating multiple web properties to offer a coherent digital experience for Physicians Insurance


IEEE Itsoc thumbnail

Upgrading the web presence of IEEE Information Theory Society, the most trusted voice for advanced technology


Great Southern Homes

Great Southern Homes, one of the fastest growing home builders in the United States, sees greater results with Drupal 9

View all Case Studies

How to create a single file Rails application?

Posted on August 10, 2022 by Michael G
Did you know you can create a single file Rails application? In this short blog post I will show you how can it be done.
https://greg.molnar.io/blog/a-single-file-rails-application/

Kubernetes network stack fundamentals: How pods on different nodes communicate

Posted on August 10, 2022 by Michael G

Learn how pods communicate with each other when they are on different Kubernetes nodes. Read More at Enable Sysadmin

The post Kubernetes network stack fundamentals: How pods on different nodes communicate appeared first on Linux.com.

  • Previous
  • 1
  • …
  • 792
  • 793
  • 794
  • 795
  • 796
  • 797
  • 798
  • …
  • 821
  • Next

Recent Posts

  • Open Source email Clients
  • When and how to use benchmarking
  • How Plotly AI revolutionizes the dashboard development process
  • [TUT] LoRa & LoRaWAN – MikroTik wAP LR8 kit mit The Things Network verbinden [4K | DE]
  • Mercado aguarda Powell e olha Trump, dados e Haddad | MINUTO TOURO DE OURO – 11/02/25

Categories

  • Android
  • Linux
  • News
  • Open Source
©2025 Open World News | Powered by Superb Themes
We use cookies on our website to give you the most relevant experience by remembering your preferences and repeat visits. By clicking “Accept All”, you consent to the use of ALL the cookies. However, you may visit "Cookie Settings" to provide a controlled consent.
Cookie SettingsAccept All
Manage consent

Privacy Overview

This website uses cookies to improve your experience while you navigate through the website. Out of these, the cookies that are categorized as necessary are stored on your browser as they are essential for the working of basic functionalities of the website. We also use third-party cookies that help us analyze and understand how you use this website. These cookies will be stored in your browser only with your consent. You also have the option to opt-out of these cookies. But opting out of some of these cookies may affect your browsing experience.
Necessary
Always Enabled
Necessary cookies are absolutely essential for the website to function properly. These cookies ensure basic functionalities and security features of the website, anonymously.
CookieDurationDescription
cookielawinfo-checkbox-analytics11 monthsThis cookie is set by GDPR Cookie Consent plugin. The cookie is used to store the user consent for the cookies in the category "Analytics".
cookielawinfo-checkbox-functional11 monthsThe cookie is set by GDPR cookie consent to record the user consent for the cookies in the category "Functional".
cookielawinfo-checkbox-necessary11 monthsThis cookie is set by GDPR Cookie Consent plugin. The cookies is used to store the user consent for the cookies in the category "Necessary".
cookielawinfo-checkbox-others11 monthsThis cookie is set by GDPR Cookie Consent plugin. The cookie is used to store the user consent for the cookies in the category "Other.
cookielawinfo-checkbox-performance11 monthsThis cookie is set by GDPR Cookie Consent plugin. The cookie is used to store the user consent for the cookies in the category "Performance".
viewed_cookie_policy11 monthsThe cookie is set by the GDPR Cookie Consent plugin and is used to store whether or not user has consented to the use of cookies. It does not store any personal data.
Functional
Functional cookies help to perform certain functionalities like sharing the content of the website on social media platforms, collect feedbacks, and other third-party features.
Performance
Performance cookies are used to understand and analyze the key performance indexes of the website which helps in delivering a better user experience for the visitors.
Analytics
Analytical cookies are used to understand how visitors interact with the website. These cookies help provide information on metrics the number of visitors, bounce rate, traffic source, etc.
Advertisement
Advertisement cookies are used to provide visitors with relevant ads and marketing campaigns. These cookies track visitors across websites and collect information to provide customized ads.
Others
Other uncategorized cookies are those that are being analyzed and have not been classified into a category as yet.
SAVE & ACCEPT