Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<!DOCTYPE qhelp PUBLIC
"-//Semmle//qhelp//EN"
"qhelp.dtd">
<qhelp>

<overview>
<p>
Passing user-controlled sources into SQL queries used in SQLAlchemy methods can result in a SQL injection flaw.
This tainted SQL query containing a user-controlled source when passed into a SQLAlchemy method can execute a malicious query in a SQL database such as Postgres, MySQL, or SQLite.
</p>

<p>
Because a user-controlled source is passed into the query, the malicious user can have complete control over the query itself.
When the tainted query is executed, the malicious user can commit malicious actions such as bypassing role restrictions or accessing and modifying restricted data in the SQL database.
</p>
</overview>

<recommendation>
<p>
SQL injections can be prevented by escaping user-input's special characters that are passed into the SQL query from the user-supplied source.
Alternatively, using a sanitize library such as Sqlescapy will ensure that user-supplied sources can not act as a malicious query.
</p>
<recommendation>

<example>
<p>In the example below, the user-supplied source is passed to a SQLAlchemy method that queries the SQLite database.</p>
<sample src="SqlAlchemyInjection-Bad.py" />
<p> This can be fixed by using a sanitizer library like Sqlescapy as shown in this annotated code version below.</p>
<sample src="SqlAlchemyInjection-Good.py" />
<example>

<references>
<li>OWASP: <a href="https://owasp.org/www-community/attacks/SQL_Injection">SQL Injection</a></li>
<li>SQLAlchemy: <a href="https://docs.sqlalchemy.org/en/14/index.html">Documentation</a></li>
</references>
</qhelp>
19 changes: 19 additions & 0 deletions python/ql/src/experimental/Security/CWE-089/SqlAlchemyInjection.ql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* @name SqlAlchemy Injection
* @description Building a SQL query from user-controlled sources is vulnerable to insertion of
* malicious SQL code by the user in SQLAlchemy.
* @kind path-problem
* @problem.severity error
* @id python/sqlalchemy-injection
* @tags experimental
* security
* external/cwe/cwe-089
*/

import python
import experimental.semmle.python.security.injection.SqlAlchemyInjection

from DataFlow::PathNode source, DataFlow::PathNode sink, SqlAlchemyInjectionConfig sqlAlchemyInjectionConfig
where sqlAlchemyInjectionConfig.hasFlowPath(source, sink)
select sink, source, sink, "$@ SQL query contains an unsanitized $@", sink, "This", source,
"user-provided value"
14 changes: 14 additions & 0 deletions python/ql/src/experimental/semmle/python/Concepts.qll
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,17 @@ private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.RemoteFlowSources
private import semmle.python.dataflow.new.TaintTracking
private import experimental.semmle.python.Frameworks

module SqlSanitizer {
abstract class Range extends DataFlow::Node {
abstract DataFlow::Node getSanitizerNode();
}
}

class SqlSanitizer extends DataFlow::Node {
SqlSanitizer::Range range;

SqlSanitizer() { this = range }

DataFlow::Node getSanitizerNode() { result = range.getSanitizerNode() }
}
67 changes: 67 additions & 0 deletions python/ql/src/experimental/semmle/python/frameworks/Stdlib.qll
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,71 @@ private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.TaintTracking
private import semmle.python.dataflow.new.RemoteFlowSources
private import experimental.semmle.python.Concepts
private import semmle.python.Concepts
private import semmle.python.ApiGraphs

private module SQLAlchemy {
private class SqlAlchemyQuerySinkMethods extends string {
SqlAlchemyQuerySinkMethods() { this in [
"filter", "filter_by", "having", "group_by", "order_by"
]
}
}

private API::Node sqlAlchemySessionInstance() {
result = API::moduleImport("sqlalchemy.orm").getMember("Session").getReturn() or
result = API::moduleImport("sqlalchemy.orm").getMember("sessionmaker").getReturn().getReturn()
}

private API::Node sqlAlchemyQueryInstance() {
result = sqlAlchemySessionInstance().getMember("query").getReturn()
}

private API::Node sqlAlchemyEngineInstance() {
result = API::moduleImport("sqlalchemy").getMember("create_engine").getReturn()
}

private class SqlAlchemyInjectionSink extends DataFlow::CallCfgNode, SqlExecution::Range {
DataFlow::Node sql;

SqlAlchemyInjectionSink() {
this in [
any(SqlAlchemyExecuteInjectionSink injectionSink)
any(SqlAlchemyQueryInjectionSink injectionSink)
] and
sql = this.getArg(0)
}

override DataFlow::Node getSql() { result = sql }
}

private class SqlAlchemyExecuteInjectionSink extends DataFlow::Node {
SqlAlchemyExecuteInjectionSink() {
this in [
sqlAlchemySessionInstance().getMember("execute").getACall(),
sqlAlchemyEngineInstance().getMember("connect").getReturn().getMember("execute").getACall(),
sqlAlchemyEngineInstance().getMember("begin").getReturn().getMember("execute").getACall()
] and
sql = this.getArg(0)
}
}

private class SqlAlchemyQueryInjectionSink extends DataFlow::Node {
SqlAlchemyQueryInjectionSink() {
this in [
sqlAlchemyQueryInstance().getMember(any(SqlAlchemyQuerySinkMethods sinkMethod)).getACall(),
sqlAlchemySessionInstance().getMember("scalar").getACall()
] and
sql = this.getArg(0)
}
}

private class SqlSanitizerCall extends DataFlow::CallCfgNode, NoSQLSanitizer::Range {
SqlSanitizerCall() {
this =
API::moduleImport("sqlescapy").getMember("sqlescape").getACall()
}

override DataFlow::Node getSanitizerNode() { result = this.getArg(0) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import experimental.semmle.python.Concepts
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.ApiGraphs
import experimental.semmle.python.frameworks.Stdlib

class SqlAlchemyInjectionConfig extends TaintTracking::Configuration {
SqlAlchemyInjectionConfig() { this = "SqlAlchemyInjectionConfig" }

override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }

override predicate isSink(DataFlow::Node sink) { sink instanceof SqlAlchemyInjectionSink }

override predicate isSanitizer(DataFlow::Node sanitizer) {
sanitizer = any(SqlSanitizer SqlSanitizer).getSanitizerNode()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import sqlalchemy
from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, backref, sessionmaker, joinedload
from sqlalchemy.sql import text
from sqlescapy import sqlescape


engine = create_engine('sqlite:///:memory:', echo=True)
Base = declarative_base()

class User(Base):
__tablename__ = 'users'

id = Column(Integer, primary_key=True)
name = Column(String)

Base.metadata.create_all(engine)

Session = sessionmaker(bind=engine)
session = Session()

ed_user = User(name='ed')
ed_user2 = User(name='george')

session.add(ed_user)
session.add(ed_user2)

session.commit()

malicious_user_input = sys.argv[1]
malicious_user_input = text("name='{}'".format(malicious_user_input))
session.query(User).filter(malicious_user_input).count()
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from flask import Flask, request
from flask_sqlalchemy import SQLAlchemy
import json
from sqlalchemy.sql import text


app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
app.config['SQLALCHEMY_ECHO'] = True
db = SQLAlchemy(app)

class User(db.Model):
uid = db.Column(db.Integer, primary_key=True, autoincrement=True)
email = db.Column(db.String(50))

def __init__(self, email):
self.email = email

db.create_all()

user1 = User(email='admin@example.com')
user2 = User(email='admin2@example.com')
db.session.add(user1)
db.session.add(user2)
db.session.commit()

@app.route('/')
def user_count():
user_info = json.loads(request.args['user_info'])
sql_query = "email='{}'".format(user_info['email'])
user_count = db.session.query(User).filter(sql_query).count()

return "Users found: {}".format(user_count)

# if __name__ == '__main__':
# app.run(debug=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from flask import Flask, request
from flask_sqlalchemy import SQLAlchemy
import json
from sqlalchemy.sql import text
from sqlescapy import sqlescape


app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
app.config['SQLALCHEMY_ECHO'] = True
db = SQLAlchemy(app)

class User(db.Model):
uid = db.Column(db.Integer, primary_key=True, autoincrement=True)
email = db.Column(db.String(50))

def __init__(self, email):
self.email = email

db.create_all()

user1 = User(email='admin@example.com')
user2 = User(email='admin2@example.com')
db.session.add(user1)
db.session.add(user2)
db.session.commit()

@app.route('/')
def user_count():
user_info = json.loads(request.args['user_info'])
sanitized_input = sqlescape(user_info['email']))
sql_query = "email='{}'".format(sanitized_input)
user_count = db.session.query(User).filter(sql_query).count()

return "Users found: {}".format(user_count)

# if __name__ == '__main__':
# app.run(debug=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import sqlalchemy
from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, backref, sessionmaker, joinedload
from sqlalchemy.sql import text
from sqlescapy import sqlescape


engine = create_engine('sqlite:///:memory:', echo=True)
Base = declarative_base()

class User(Base):
__tablename__ = 'users'

id = Column(Integer, primary_key=True)
name = Column(String)

Base.metadata.create_all(engine)

Session = sessionmaker(bind=engine)
session = Session()

ed_user = User(name='ed')
ed_user2 = User(name='george')

session.add(ed_user)
session.add(ed_user2)

session.commit()


malicious_user_input = sys.argv[1]
escaped_input = sqlescape(malicious_user_input)
sql_text = "name='{}'".format(escaped_input)
sql_query = text(sql_text)

session.query(User).filter(malicious_user_input).count()