Boto3とStripeをPythonでローカル実行

Virtualenvの立ち上げ Pythonでの開発環境をたてます。 パッケージインストール 必要なパッケージをいれましょう。AWSリソースへのアクセスはboto3を、Stripeへのアクセスはstripeを追加します。 […]

広告ここから
広告ここまで

目次

    Virtualenvの立ち上げ

    Pythonでの開発環境をたてます。

    $ mkdir python_tasks
    $ virtualenv --no-site-packages python_tasks
    New python executable in /dev/python_tasks/python_tasks/bin/python
    Installing setuptools, pip, wheel...done.
    $ cd python_tasks
    $ source bin/activate
    (python_tasks) :python_tasks $ 
    

    パッケージインストール

    必要なパッケージをいれましょう。AWSリソースへのアクセスはboto3を、Stripeへのアクセスはstripeを追加します。

    $ pip install --upgrade pip
    $ pip install boto3
    $ pip install  stripe
    $ pip list
    Package         Version  
    --------------- ---------
    boto3           1.7.9    
    botocore        1.10.9   
    certifi         2018.4.16
    chardet         3.0.4    
    docutils        0.14     
    futures         3.2.0    
    idna            2.6      
    jmespath        0.9.3    
    pip             10.0.1   
    python-dateutil 2.7.2    
    requests        2.18.4   
    s3transfer      0.1.13   
    setuptools      21.2.1   
    six             1.11.0   
    stripe          1.80.0   
    urllib3         1.22     
    wheel           0.29.0   
    

    以下のようなエラーが出た場合は、pipのバージョンを確認しましょう。pipのアップデートを忘れているケースだと思いますので、pip install --upgrade pipで解決します。

    Collecting boto3
      Could not fetch URL https://pypi.python.org/simple/boto3/: There was a problem confirming the ssl certificate: [SSL: TLSV1_ALERT_PROTOCOL_VERSION] tlsv1 alert protocol version (_ssl.c:590) - skipping
      Could not find a version that satisfies the requirement boto3 (from versions: )
    No matching distribution found for boto3

    Pythonのスクリプトファイルを用意

    実行するコードを記述しましょう。例えばCognito User Poolsでアカウントの有効化が完了しているユーザーのみ取得するコードであれば、以下のようになります。

    import boto3
    cognito = boto3.client('cognito-idp')
    UserPoolId = 'us-east-1_XXXX'
    
    def getUsers():
        response = cognito.list_users(
            UserPoolId=UserPoolId,
            Filter="cognito:user_status='CONFIRMED'"
        )
        return response
    
    users = getUsers()
    print(users)

    Stripeのカスタマーのメールアドレス更新であれば、以下のような書き方ができます。

    import stripe
    stripeKey = 'sk_test_XXXXXXXXX'
    
    class Stripe:
        def __init__(self, stripe):
            self.stripe = stripe
    
        def setApiKey(self, apiKey):
            self.stripe.api_key = apiKey
    
        def getCustomer(self, customerId):
            return self.stripe.Customer.retrieve(customerId)
    
        def updateCustomerEmail(self, customerId, email):
            customer = self.getCustomer(customerId)
            customer.email = email
            customer.save()
    
    stripeClass = Stripe(stripe)
    stripeClass.setApiKey(stripeKey)
    stripeClass.updateCustomerEmail('cus_xxxx', '[email protected]')

    応用:CognitoのメールアドレスをStripeの顧客に反映する

    import boto3
    import stripe
    
    cognito = boto3.client('cognito-idp')
    UserPoolId = 'us-east-1_XXXXX'
    stripeKey = 'sk_test_XXXXX'
    
    class Stripe:
        def __init__(self, stripe):
            self.stripe = stripe
    
        def setApiKey(self, apiKey):
            self.stripe.api_key = apiKey
    
        def getCustomer(self, customerId):
            return self.stripe.Customer.retrieve(customerId)
    
        def updateCustomerEmail(self, customerId, email):
            customer = self.getCustomer(customerId)
            print(customer.email)
            customer.email = email
            customer.save()
    
    
    def getUsers():
        response = cognito.list_users(
            UserPoolId=UserPoolId,
            Filter="cognito:user_status='CONFIRMED'"
        )
        return response
    
    
    def getUsersWithPagenation(PaginationToken):
        response = cognito.list_users(
            UserPoolId=UserPoolId,
            PaginationToken=PaginationToken,
            Filter="cognito:user_status='CONFIRMED'"
        )
        return response
    
    
    class workflow:
        def __init__(self, stripeApiKey):
            stripeClass = Stripe(stripe)
            stripeClass.setApiKey(stripeKey)
            self.stripe = stripeClass
    
    
        def syncEmailToStripeWorkflow(self, users):
            self.syncEmailToStripe(users['Users'])
            if 'PaginationToken' in users:
                users = getUsersWithPagenation(users['PaginationToken'])
                self.syncEmailToStripeWorkflow(users)
            else:
                print('end')
    
        def getUserAttributes(self, attributes, name):
            name_values = [x['Value'] for x in attributes if x['Name'] == name]
            value = name_values[0] if len(name_values) else ''
            return value
    
        def runSyncEmailToStripe(self, user):
            try :
                stripeId = self.getUserAttributes(user['Attributes'], 'custom:stripeID')
                if (not stripeId): return
                email = self.getUserAttributes(user['Attributes'], 'email')
                print(user['Username'])
                print(email)
                print(stripeId)
                result = self.stripe.updateCustomerEmail(stripeId, email)
                print(result)
            except:
                print('Fail to sync stripe customer')
    
    
        def syncEmailToStripe(self, users):
            for user in users:
                if user['Enabled']:
                    self.runSyncEmailToStripe(user)
    
    users = getUsers()
    wf = workflow(stripeKey)
    wf.syncEmailToStripeWorkflow(users)

    StripeとCognito User Pools系を操作できるようになれば、有料会員サービスの実装なども比較的やりやすくなるので、みなさんぜひ触ってみてください。

    広告ここから
    広告ここまで
    Home
    Search
    Bookmark