Buy Me a Coffee? Your support is much appreciated!

Source Code:  

demo.py

from youtube import YouTube

client_file = 'client-secret.json'
yt = YouTube(client_file)
yt.init_service()

# exampe 1. List channel subscriptions
subscriptions = yt.list_subscriptions()
print('Total Subscription: {0}'.format(len(subscriptions)))

for subscription in subscriptions:
    print('subscription id: {0}'.format(subscription['id']))
    print('channelId id: {0}'.format(subscription['snippet']['channelId']))
    print('channelId title: {0}'.format(subscription['snippet']['title']))
    print('channelId created: {0}'.format(subscription['snippet']['publishedAt']))
    print()

# example 2. Unsubscribe channels
for subscription in subscriptions:
    if 'amazon' in subscription['snippet']['title'].lower():
        yt.remove_subscription(subscription['id'])
        print('channel {0} unsubscribed'.format(subscription['snippet']['title']))

# example 3. Subscribe to a channel
response = yt.add_subscription('UCvVZ19DRSLIC2-RUOeWx8ug')
print(response)




YouTube.py

from google_apis import create_service

class YouTube:
    API_NAME = 'youtube'
    API_VERSION = 'v3'
    SCOPES = ['https://www.googleapis.com/auth/youtube']

    def __init__(self, client_file):
        self.client_file = client_file
        self.service = None

    def init_service(self):
        self.service = create_service(self.client_file, self.API_NAME, self.API_VERSION, self.SCOPES)

    def list_channel_subscriptions(self, channel_id, filter_channels=None, order_by='alphabetical'):
        """
        order_by: {alphabetical;relevance;unread}
        """
        subscriptions = []
        try:
            response = self.service.subscriptions().list(
                channelId=channel_id,      
                part='snippet',
                forChannelId=filter_channels,
                maxResults=50,
                order=order_by
            ).execute()

            subscriptions.extend(response.get('items'))
            next_page_token = response.get('nextPageToken')

            while next_page_token:
                response = self.service.subscriptions().list(
                    channelId=channel_id,
                    part='snippet',
                    forChannelId=filter_channels,
                    maxResults=50,
                    order=order_by,
                    pageToken=next_page_token
                ).execute()
                subscriptions.extend(response.get('items'))
                next_page_token = response.get('nextPageToken')
            return subscriptions
        except Exception as e:
            print(e.error_details[0]['message'])
            return None

    def list_subscriptions(self, filter_channels=None, order_by='alphabetical'):
        """
        order_by: {alphabetical;relevance;unread}
        """        
        subscriptions = []
        response = self.service.subscriptions().list(
            mine=True,
            part='snippet',
            forChannelId=filter_channels,
            maxResults=50,
            order='alphabetical'
        ).execute()

        subscriptions.extend(response.get('items'))
        next_page_token = response.get('nextPageToken')

        while next_page_token:
            response = self.service.subscriptions().list(
                mine=True,
                part='snippet',
                forChannelId=filter_channels,
                maxResults=50,
                order='alphabetical',
                pageToken=next_page_token
            ).execute()
            subscriptions.extend(response.get('items'))
            # subscriptions.extend(map(lambda v: v['snippet'], response.get('items')))
            next_page_token = response.get('nextPageToken')
        return subscriptions

    def add_subscription(self, channel_id):
        try:
            response = self.service.subscriptions().insert(
                part='snippet',
                body={'snippet': {'resourceId': {'kind': 'youtube#channel', 'channelId': channel_id}}}
            ).execute()
            print('channel {0} added'.format(channel_id))
            return response
        except Exception as e:
            print(e.error_details[0]['message'])
            return None            

    def remove_subscription(self, subscription_id):
        self.service.subscriptions().delete(
            id=subscription_id
        ).execute()