Recently I ran into a very nasty issue with Airflow’s workers getting stuck in AWS. The workers would process a job and then randomly got stuck. For me, the worker kept getting stuck when it connected to Box API and Salesforce. This issue took a while to find the root cause and finally a fix.
I was using the latest Box Python SDK and SimpleSalesforce to connect to the Salesforce application.
What I did to troubleshoot the issue:
Turn Airflow log level to Debug. Update the
airflow.cfg
file# airflow.cfg # Logging level. Originally INFO logging_level = DEBUG
The log was a LOT noisier but there were more information to help debugging the issue. For me, I saw the job was still running and alive as there were many
[heartbeat]
messages generated in the log by the worker, signifying that the job was still running, but was stuck waiting.Since the issue happened very randomly at any moment, I wanted to make sure our servers did not hit the file limit, and sure enough, they didn’t. The file limits and file descriptors were ok. CPU and RAM utilization was within the normal range.
On the server that was having an ongoing-but-stuck job, I ran
netcat -p
and it showed a lot of open connections, but there were a few connections in theCLOSE_WAIT
state. These connections were opened by the Airflow workers.CLOSE_WAIT
means that the Airflow job was trying to make a connection to the 3rd party API and the request was terminated on the 3rd party end, but our Python script was still waiting.If I issued a
kill -15 <pid of CLOSE_WAIT process>
then the job would then resume.I decided to dig deeper into the libraries codes. Both SimpleSalesforce and Box Python SDK were using the
requests
module to handle the networking part. Underneath,requests
uses theurllib3
to provide asession
connection pool. It turns out that the code was hanging at trying to read content from the SSL connection.
After further digging and trying different things, I realized that the requests
connection did not have any default timeout. This means that if something happens to the connection (network connectivity), the code will be stuck forever. In this case, our Airflow workers did get stuck indefinitely and we had to manually restart the job.
Since we don’t know why AWS network was having such connectivity issue, the solution is to monkeypatch the connection to have a default timeout amount.
I created a new dag_monkeypatch_requests.py
in the dags/
folder so that Airflow would scan and import the code in at run time:
# dags/dag_monkeypatch_requests.py
import logging
def patch_request_set_default_timeout():
import requests
patched = getattr(requests.sessions.Session, "__request_patched", False)
if patched:
return
logging.info("*** [Monkeypatch] Patching requests - set timeout")
def request_patch(slf, *args, **kwargs):
timeout = kwargs.pop("timeout", None)
if timeout is None:
timeout = (5, 120)
logging.debug("*** set timeout to ", timeout)
return slf.request_original(*args, **kwargs, timeout=timeout)
setattr(requests.sessions.Session, "request_original", requests.sessions.Session.request)
setattr(requests.sessions.Session, "__request_patched", True)
requests.sessions.Session.request = request_patch
patch_request_set_default_timeout()
This code monkeypatches the requests.sessions.Session.request
method to have a default timeout of (5, 120)
, which means a 5 seconds timeout for CONNECT
(the initial connect attempt) and 120 seconds for the whole connection duration. If the request takes longer than this, it will throw a Timeout exception instead of just hanging.
Once we implemented the monkeypatch, our Airflow cluster became normal again. This patch would only apply once so it’s safe for Airflow to reload this code multiple times.
The key take away is that don’t be afraid of reading external libraries to understand what’s going on, and don’t assume anything when debugging an issue.
Leave a Reply