抽象是编程的生命线。抽象采取复杂的操作,从而通过可操作的接口轻松工作。本文我们将讨论如何在基于 SQLAlchemy 的(web)应用程序中存储和验证密码(或其加密散列)的方式。并且基于这种方式,我们可以升级密码的安全性,因为很可能会发生旧的加密方案被破坏或被证明有不足,此时就要有新的加密方案被引入。





  1. import bcrypt
  2. from sqlalchemy import Column, Integer, Text
  3. class User(Base):
  4. __tablename__ = 'user'
  5. id = Column(Integer, primary_key=True)
  6. name = Column(Text)
  7. password = Column(Text)
  8. pwhash = bcrypt.hashpw(login_data['password'], user.password)
  9. if user.password == pwhash:
  10. print 'Access granted'

上面的代码片段使用 Python bcrypt 包作为密码加密组件,你可能会使用其他的加密组件例如:PBKDF2scrypt。不管怎样,只有可以肯定的是:它不应该有一个固定的盐值,它也应该绝对不会是一轮优化速度的 SHA1SHA2 功能。其实就是应该使用目前业界通用的历经考验的算法。

假设散列算法是安全的,那么这段代码对于达到我们的目的是完美的。但是,大多数应用程序都是琐碎的,我们会发现大多数的一次性脚本往往会被 CV 重复使用。随着时间的推移,你发现自己面临着越来越多的重复代码,仅仅用来验证用户的密码。所以,是时候做些什么事情了,该是重构的时候了。



在这种情况下,它的委托给一个类 User。我们给 User 类添加了一种方法,将当前密码的哈希值和需要尝试计算的值作为参数传递进去,并返回它们是否相同:

  1. import bcrypt
  2. from sqlalchemy import Column, Integer, Text
  3. class User(Base):
  4. __tablename__ = 'user'
  5. id = Column(Integer, primary_key=True)
  6. name = Column(Text)
  7. password = Column(Text)
  8. def verify_password(self, password):
  9. pwhash = bcrypt.hashpw(password, self.password)
  10. return self.password == pwhash
  11. if user.verify_password(login_data['password']):
  12. print 'Access granted'


  1. 多个模型可能需要密码验证; 您的应用程序可能有常规和管理用户的单独模型,或包括需要密码的其他原语。这些都需要自己的验证方法。一个解决方案是抽象出一个单一的 mixin 类,用来处理密码验证。
  2. 密码设置仍需要处理。这可以用 @validates 这个装饰器来装饰需要创建哈希的字段。为了使其可重用,将它添加到上面提到的 mixin 类将是一个好主意。
  3. 更重要的,我认为这是一个不必要的抽象,不必要暴露实现细节(bcrypt),应该有一个更直观的接口。


更直观的接口应该是比较符,而不是函数调用。我们想要实现的是检查提供的密码是否等于我们所拥有的密码(即使我们只能真正地比较明文与散列)。为此,我们可以写一个相当简单的类,它需要一个 bcrypt 哈希,它提供了自己的 equal 比较器。

  1. import bcrypt
  2. class PasswordHash(object):
  3. def __init__(self, hash_):
  4. assert len(hash_) == 60, 'bcrypt hash should be 60 chars.'
  5. assert hash_.count('$'), 'bcrypt hash should have 3x "$".'
  6. self.hash = str(hash_)
  7. self.rounds = int(self.hash.split('$')[2])
  8. def __eq__(self, candidate):
  9. """Hashes the candidate string and compares it to the stored hash."""
  10. if isinstance(candidate, basestring):
  11. if isinstance(candidate, unicode):
  12. candidate = candidate.encode('utf8')
  13. return bcrypt.hashpw(candidate, self.hash) == self.hash
  14. return False
  15. def __repr__(self):
  16. """Simple object representation."""
  17. return '<{}>'.format(type(self).__name__)
  18. @classmethod
  19. def new(cls, password, rounds):
  20. """Creates a PasswordHash from the given password."""
  21. if isinstance(password, unicode):
  22. password = password.encode('utf8')
  23. return cls(bcrypt.hashpw(password, bcrypt.gensalt(rounds)))

创建新 PasswordHash 可以从一个明文密码来实现类方法,或只需实例化一个现有的哈希值。将现有的哈希与明文密码进行比较是非常简单明了的:

  1. if user.password == login_data['password']:
  2. print 'Access granted'


密码哈希本质上是一个简单的字符串。我们要做的是确保在我们的 PasswordHash 中封装的哈希值存储在数据库中,并且当我们从数据库读取一个哈希值时返回一个 PasswordHash 对象。为此,SQLAlchemy 为我们提供了 TypeDecorators,它允许对我们新的密码类型的增加新的功能。

使用 TypeDecorator 构造块,我们构造一个新的密码类型,我们可以在列规范中使用。有一些事情,我们需要关注:

  1. 选择要扩展的数据库类型。在这个例子中我使用的是 Text,但根据底层数据库的不同可能有更好的类型。
  2. 一种将 PasswordHash 对象转换为适合实现器类型的值的方法。这是 process_bind_param() 方法。
  3. 一种将数据库值转换为我们想要在 Python 运行时中使用的 PasswordHash 的方法。这是 process_result_value() 方法。
  1. from sqlalchemy import Column, Integer, Text, TypeDecorator
  2. from sqlalchemy.orm import validates
  3. class Password(TypeDecorator):
  4. """Allows storing and retrieving password hashes using PasswordHash."""
  5. impl = Text
  6. def __init__(self, rounds=12, **kwds):
  7. self.rounds = rounds
  8. super(Password, self).__init__(**kwds)
  9. def process_bind_param(self, value, dialect):
  10. """Ensure the value is a PasswordHash and then return its hash."""
  11. return self._convert(value).hash
  12. def process_result_value(self, value, dialect):
  13. """Convert the hash to a PasswordHash, if it's non-NULL."""
  14. if value is not None:
  15. return PasswordHash(value)
  16. def validator(self, password):
  17. """Provides a validator/converter for @validates usage."""
  18. return self._convert(password)
  19. def _convert(self, value):
  20. """Returns a PasswordHash from the given string.
  21. PasswordHash instances or None values will return unchanged.
  22. Strings will be hashed and the resulting PasswordHash returned.
  23. Any other input will result in a TypeError.
  24. """
  25. if isinstance(value, PasswordHash):
  26. return value
  27. elif isinstance(value, basestring):
  28. return PasswordHash.new(value, self.rounds)
  29. elif value is not None:
  30. raise TypeError(
  31. 'Cannot convert {} to a PasswordHash'.format(type(value)))
  32. class User(Base):
  33. __tablename__ = 'user'
  34. id = Column(Integer, primary_key=True)
  35. name = Column(Text)
  36. password = Column(Password)
  37. # Or specify a cost factor other than the default 12
  38. # password = Column(Password(rounds=10))
  39. @validates('password')
  40. def _validate_password(self, key, password):
  41. return getattr(type(self), key).type.validator(password)

@validates 装饰器是可选的,但我们需要确保密码被赋值时会转换为 PasswordHash,并且保存到 session 中是透明的。这其实就是说是在赋值之后,刷新到数据库之间完成的 Hash转换。这也意味着不会有一个纯文本值存储在用户对象上,这也就意味着它不会意外泄漏,这绝对是一个惊喜。



当之前列出缺点时,我提到了一些代码重用可以用一个 mixin 类建立。这仍然可以用上面的解决方案来实现,利用 SQLAlchemy 支持混入列的特性。

以下代码段定义了这样一个 mixin,然后被两个模型 UserProtectedFile 使用。这两个模型都将有一个密码列属性,包括将字符串转换为正确的加密字段。

  1. class HasPassword(object):
  2. password = Column(Password)
  3. @validates('password')
  4. def _validate_password(self, key, password):
  5. return getattr(type(self), key).type.validator(password)
  6. class User(HasPassword, Base):
  7. __tablename__ = 'user'
  8. id = Column(Integer, primary_key=True)
  9. name = Column(Text)
  10. class ProtectedFile(HasPassword, Base):
  11. __tablename__ = 'protected_file'
  12. id = Column(Integer, primary_key=True)
  13. filename = Column(Text)







  1. class PasswordHash(Mutable):
  2. def __init__(self, hash_, rounds=None):
  3. assert len(hash_) == 60, 'bcrypt hash should be 60 chars.'
  4. assert hash_.count('$'), 'bcrypt hash should have 3x "$".'
  5. self.hash = str(hash_)
  6. self.rounds = int(self.hash.split('$')[2])
  7. self.desired_rounds = rounds or self.rounds
  8. def __eq__(self, candidate):
  9. """Hashes the candidate string and compares it to the stored hash.
  10. If the current and desired number of rounds differ, the password is
  11. re-hashed with the desired number of rounds and updated with the results.
  12. This will also mark the object as having changed (and thus need updating).
  13. """
  14. if isinstance(candidate, basestring):
  15. if isinstance(candidate, unicode):
  16. candidate = candidate.encode('utf8')
  17. if self.hash == bcrypt.hashpw(candidate, self.hash):
  18. if self.rounds < self.desired_rounds:
  19. self._rehash(candidate)
  20. return True
  21. return False
  22. def __repr__(self):
  23. """Simple object representation."""
  24. return '<{}>'.format(type(self).__name__)
  25. @classmethod
  26. def coerce(cls, key, value):
  27. """Ensure that loaded values are PasswordHashes."""
  28. if isinstance(value, PasswordHash):
  29. return value
  30. return super(PasswordHash, cls).coerce(key, value)
  31. @classmethod
  32. def new(cls, password, rounds):
  33. """Returns a new PasswordHash object for the given password and rounds."""
  34. if isinstance(password, unicode):
  35. password = password.encode('utf8')
  36. return cls(cls._new(password, rounds))
  37. @staticmethod
  38. def _new(password, rounds):
  39. """Returns a new bcrypt hash for the given password and rounds."""
  40. return bcrypt.hashpw(password, bcrypt.gensalt(rounds))
  41. def _rehash(self, password):
  42. """Recreates the internal hash and marks the object as changed."""
  43. self.hash = self._new(password, self.desired_rounds)
  44. self.rounds = self.desired_rounds
  45. self.changed()


  1. 继承可变允许一个需要持久化状态的内部变化的信号。
  2. 知道是否升级,所需的复杂性需要设置和下存储到哈希的当前的复杂性。
  3. 当提供的密码正确时,根据当前情况检查所需的复杂性。如果当前复杂度太低,我们重新刷新密码,更新复杂性并标记更改。
  4. 所述裹胁()方法是可变的所需的接口的一部分。它不会为这个类做太多,但仍然需要。
  5. 重用代码,_new()现在是负责从纯文本和复杂的参数创建一个新的bcrypt哈希值。



  1. class Password(TypeDecorator):
  2. """Allows storing and retrieving password hashes using PasswordHash."""
  3. impl = Text
  4. def __init__(self, rounds=12, **kwds):
  5. self.rounds = rounds
  6. super(Password, self).__init__(**kwds)
  7. def process_bind_param(self, value, dialect):
  8. """Ensure the value is a PasswordHash and then return its hash."""
  9. return self._convert(value).hash
  10. def process_result_value(self, value, dialect):
  11. """Convert the hash to a PasswordHash, if it's non-NULL."""
  12. if value is not None:
  13. return PasswordHash(value, rounds=self.rounds)
  14. def validator(self, password):
  15. """Provides a validator/converter for @validates usage."""
  16. return self._convert(password)
  17. def _convert(self, value):
  18. """Returns a PasswordHash from the given string.
  19. PasswordHash instances or None values will return unchanged.
  20. Strings will be hashed and the resulting PasswordHash returned.
  21. Any other input will result in a TypeError.
  22. """
  23. if isinstance(value, PasswordHash):
  24. return value
  25. elif isinstance(value, basestring):
  26. return PasswordHash.new(value, self.rounds)
  27. elif value is not None:
  28. raise TypeError(
  29. 'Cannot convert {} to a PasswordHash'.format(type(value)))



  1. class User(Base):
  2. __tablename__ = 'user'
  3. id = Column(Integer, primary_key=True)
  4. name = Column(Text)
  5. password = Column(Password(rounds=13))
  6. @validates('password')
  7. def _validate_password(self, key, password):
  8. return getattr(type(self), key).type.validator(password)
  9. # Create plain user with default key complexity
  10. john = User(name='John', password='flatten-shallow-ideal')
  11. # Create an admin user with higher key derivation complexity
  12. administrator = User(
  13. name='Simon',
  14. password=PasswordHash.new('working-as-designed', 15))







