Boto3とStripeをPythonでローカル実行
Virtualenvの立ち上げ Pythonでの開発環境をたてます。 パッケージインストール 必要なパッケージをいれましょう。AWSリソースへのアクセスはboto3を、Stripeへのアクセスはstripeを追加します。 […]
広告ここから
広告ここまで
目次
Virtualenvの立ち上げ
Pythonでの開発環境をたてます。
$ mkdir python_tasks
$ virtualenv --no-site-packages python_tasks
New python executable in /dev/python_tasks/python_tasks/bin/python
Installing setuptools, pip, wheel...done.
$ cd python_tasks
$ source bin/activate
(python_tasks) :python_tasks $
パッケージインストール
必要なパッケージをいれましょう。AWSリソースへのアクセスはboto3
を、Stripeへのアクセスはstripe
を追加します。
$ pip install --upgrade pip
$ pip install boto3
$ pip install stripe
$ pip list
Package Version
--------------- ---------
boto3 1.7.9
botocore 1.10.9
certifi 2018.4.16
chardet 3.0.4
docutils 0.14
futures 3.2.0
idna 2.6
jmespath 0.9.3
pip 10.0.1
python-dateutil 2.7.2
requests 2.18.4
s3transfer 0.1.13
setuptools 21.2.1
six 1.11.0
stripe 1.80.0
urllib3 1.22
wheel 0.29.0
以下のようなエラーが出た場合は、pipのバージョンを確認しましょう。pipのアップデートを忘れているケースだと思いますので、pip install --upgrade pip
で解決します。
Collecting boto3
Could not fetch URL https://pypi.python.org/simple/boto3/: There was a problem confirming the ssl certificate: [SSL: TLSV1_ALERT_PROTOCOL_VERSION] tlsv1 alert protocol version (_ssl.c:590) - skipping
Could not find a version that satisfies the requirement boto3 (from versions: )
No matching distribution found for boto3
Pythonのスクリプトファイルを用意
実行するコードを記述しましょう。例えばCognito User Poolsでアカウントの有効化が完了しているユーザーのみ取得するコードであれば、以下のようになります。
import boto3
cognito = boto3.client('cognito-idp')
UserPoolId = 'us-east-1_XXXX'
def getUsers():
response = cognito.list_users(
UserPoolId=UserPoolId,
Filter="cognito:user_status='CONFIRMED'"
)
return response
users = getUsers()
print(users)
Stripeのカスタマーのメールアドレス更新であれば、以下のような書き方ができます。
import stripe
stripeKey = 'sk_test_XXXXXXXXX'
class Stripe:
def __init__(self, stripe):
self.stripe = stripe
def setApiKey(self, apiKey):
self.stripe.api_key = apiKey
def getCustomer(self, customerId):
return self.stripe.Customer.retrieve(customerId)
def updateCustomerEmail(self, customerId, email):
customer = self.getCustomer(customerId)
customer.email = email
customer.save()
stripeClass = Stripe(stripe)
stripeClass.setApiKey(stripeKey)
stripeClass.updateCustomerEmail('cus_xxxx', '[email protected]')
応用:CognitoのメールアドレスをStripeの顧客に反映する
import boto3
import stripe
cognito = boto3.client('cognito-idp')
UserPoolId = 'us-east-1_XXXXX'
stripeKey = 'sk_test_XXXXX'
class Stripe:
def __init__(self, stripe):
self.stripe = stripe
def setApiKey(self, apiKey):
self.stripe.api_key = apiKey
def getCustomer(self, customerId):
return self.stripe.Customer.retrieve(customerId)
def updateCustomerEmail(self, customerId, email):
customer = self.getCustomer(customerId)
print(customer.email)
customer.email = email
customer.save()
def getUsers():
response = cognito.list_users(
UserPoolId=UserPoolId,
Filter="cognito:user_status='CONFIRMED'"
)
return response
def getUsersWithPagenation(PaginationToken):
response = cognito.list_users(
UserPoolId=UserPoolId,
PaginationToken=PaginationToken,
Filter="cognito:user_status='CONFIRMED'"
)
return response
class workflow:
def __init__(self, stripeApiKey):
stripeClass = Stripe(stripe)
stripeClass.setApiKey(stripeKey)
self.stripe = stripeClass
def syncEmailToStripeWorkflow(self, users):
self.syncEmailToStripe(users['Users'])
if 'PaginationToken' in users:
users = getUsersWithPagenation(users['PaginationToken'])
self.syncEmailToStripeWorkflow(users)
else:
print('end')
def getUserAttributes(self, attributes, name):
name_values = [x['Value'] for x in attributes if x['Name'] == name]
value = name_values[0] if len(name_values) else ''
return value
def runSyncEmailToStripe(self, user):
try :
stripeId = self.getUserAttributes(user['Attributes'], 'custom:stripeID')
if (not stripeId): return
email = self.getUserAttributes(user['Attributes'], 'email')
print(user['Username'])
print(email)
print(stripeId)
result = self.stripe.updateCustomerEmail(stripeId, email)
print(result)
except:
print('Fail to sync stripe customer')
def syncEmailToStripe(self, users):
for user in users:
if user['Enabled']:
self.runSyncEmailToStripe(user)
users = getUsers()
wf = workflow(stripeKey)
wf.syncEmailToStripeWorkflow(users)
StripeとCognito User Pools系を操作できるようになれば、有料会員サービスの実装なども比較的やりやすくなるので、みなさんぜひ触ってみてください。