#!/usr/local/bin/python3# coding=gbk# http://www.cnblogs.com/txw1958/# import os, io, sys, re, time, base64, jsonimport webbrowser, urllib.requestimport unittestfrom weibopy.auth import OAuthHandler, BasicAuthHandlerfrom weibopy.api import APIfrom weibopy.api import WeibopErrorclass Test(unittest.TestCase): consumer_key = "" consumer_secret = "" def __init__(self, app_key, app_secret): """ constructor """ self.consumer_key = app_key self.consumer_secret = app_secret def getAtt(self, key): try: return self.obj.__getattribute__(key) except Exception as e: print(e) return '' def getAttValue(self, obj, key): try: return obj.__getattribute__(key) except Exception as e: print(e) return '' #1.最老的认证方式,现在已不支持 def basicAuth(self, source, username, password): self.authType = 'basicauth' self.auth = BasicAuthHandler(username, password) self.api = API(self.auth,source=source) #2.新的认证方式,目前新浪所支持的 # 新浪源代码中需要手动添加,现在改为代码自动解析verifier def auth(self, username, password): if len(self.consumer_key) == 0: print("Please set consumer_key") return if len(self.consumer_key) == 0: print("Please set consumer_secret") return oauthFormat = "json" if True else "xml" self.auth = OAuthHandler(self.consumer_key, self.consumer_secret) auth_url = self.auth.get_authorization_url() auth_url = auth_url + '&oauth_callback=' + oauthFormat + '&userId=' + username +'&passwd=' + password #print('URL: ' + auth_url) if (oauthFormat == "xml"): xmlcontent = urllib.request.urlopen(auth_url) xmldoc = minidom.parse(xmlcontent) root = xmldoc.documentElement oauth_verifier = root.getElementsByTagName("oauth_verifier")[0] verifier = oauth_verifier.childNodes[0].data else: jsoncontent = urllib.request.urlopen(auth_url) stdout = jsoncontent.read().decode('utf-8') jsondatas = json.loads(stdout) verifier = jsondatas["oauth_verifier"] #print("Verifier: " + verifier) #verifier = input('PIN: ').strip() access_token = self.auth.get_access_token(verifier) self.api = API(self.auth) return access_token #3. 直接使用新的认证方式生成的已授权密码和密钥通讯,不需要重新认证 # 将已授权密码和密钥保存下来,直接使用 def setAccessToken(self, key, secret): self.auth = OAuthHandler(self.consumer_key, self.consumer_secret) self.auth.setAccessToken(key, secret) self.api = API(self.auth) #发布一条微博信息 def update(self, message): status = self.api.update_status(lat='39', long='110', status=message) self.obj = status id = self.getAtt("id") text = self.getAtt("text") print("update---"+ str(id) +":"+ text) return int(id) #上传图片并发布一条微博信息 #仅支持 'image/gif', 'image/jpeg', 'image/png' def upload(self, filename, message): status = self.api.upload(filename, status=message) self.obj = status id = self.getAtt("id") text = self.getAtt("text") self.obj = self.getAtt("user") profile_image_url = self.getAtt("profile_image_url") print("upload,id="+ str(id) +",text="+ text +",image_name="+ filename) def uploadExt(self, filename, message, latPosition, longPosition): status = self.api.upload(filename, status=message, lat=latPosition, long=longPosition) self.obj = status id = self.getAtt("id") text = self.getAtt("text") self.obj = self.getAtt("user") profile_image_url = self.getAtt("profile_image_url") print("upload,id="+ str(id) +",text="+ text +",profile_image_url="+ profile_image_url) #删除一条微博信息 def destroy_status(self, id): status = self.api.destroy_status(id) self.obj = status id = self.getAtt("id") text = self.getAtt("text") print("destroy---"+ str(id) +":"+ text) #回复微博评论信息 #个人理解:对某条消息id的评论cid的评论 def reply(self, id, cid, message): status = self.api.reply(id, cid, message) self.obj = status id = self.getAtt("id") text = self.getAtt("text") print("reply---"+ str(id) +":"+ text) #转发一条微博信息(可加评论)id:微博id, message:评论 def repost(self, id, message): status = self.api.repost(id, message) self.obj = status id = self.getAtt("id") text = self.getAtt("text") print("repost---"+ str(id) +":"+ text) #获取指定微博的评论列表: id:微博消息id def comments(self, id): comments = self.api.comments(id=id) for comment in comments: self.obj = comment mid = self.getAtt("id") text = self.getAtt("text") print("comments---"+ str(mid) +":"+ text) #获取当前用户发送及收到的评论列表: 5条 def comments_timeline(self): comments = self.api.comments_timeline() for comment in comments: self.obj = comment mid = self.getAtt("id") text = self.getAtt("text") print("comments_timeline---"+ str(mid) +":"+ text) #获取当前用户发出的评论: number 评论条数, 返回最后一条评论的id def comments_by_me(self, number): comments = self.api.comments_by_me(count=number) mid = '' for comment in comments: self.obj = comment mid = self.getAtt("id") text = self.getAtt("text") created_at = self.getAtt("created_at") print('comments_by_me,id='+ str(mid) +',text='+ text+',created_at='+ str(created_at)) return mid #评论指定微博: mid:微博id content: 要评论的内容 def comment(self, mid, comment_content): comment = self.api.comment(id=mid, comment=comment_content) self.obj = comment mid2 = self.getAtt("id") text = self.getAtt("text") print("mid: "+ str(mid2) +" test:"+ text) #删除当前用户的微博评论信息: mid 消息id def comment_destroy (self, mid): comment = self.api.comment_destroy(mid) self.obj = comment mid = self.getAtt("id") text = self.getAtt("text") print("comment_destroy---"+ str(mid) +":"+ text) #批量获取一组微博的评论数及转发数 def counts(self): counts = self.api.counts(ids='3363780346175114,3366928917792687,1877120192') for count in counts: self.obj = count mid = self.getAtt("id") comments = self.getAtt("comments") rt = self.getAtt("rt") print("mentions---"+ str(mid) +":"+ str(comments) +":"+ str(rt)) #私信,通过高级认证的应用key可以使用 def direct_message(self): messages = self.api.direct_messages() mid = '' for msg in messages: self.obj = msg mid = self.getAtt("id") text = self.getAtt("text") print("direct_message---"+ str(mid) +":"+ text) return mid #我发送的私信列表 def sent_direct_messages(self): messages = self.api.sent_direct_messages() for msg in messages: self.obj = msg mid = self.getAtt("id") text = self.getAtt("text") print("sent_direct_messages---"+ str(mid) +":"+ text) #发送私信 def new_direct_message(self): msg = self.api.new_direct_message(id=1114365581,text='directMessages--test-测试-'+ str(time.time())) self.obj = msg mid = self.getAtt("id") text = self.getAtt("text") print("new_direct_message---"+ str(mid) +":"+ text) #删除一条私信 def destroy_direct_message(self, id): msg = self.api.destroy_direct_message(id) self.obj = msg mid = self.getAtt("id") text = self.getAtt("text") print("destroy_direct_message---"+ str(mid) +":"+ text) #获取当前用户的收藏列表 def favorites(self): statuses = self.api.favorites(id=2) for status in statuses: self.obj = status sid = self.getAtt("id") text = self.getAtt("text") print("favorites---"+ str(sid) +":"+ text) #添加收藏: id:消息id def create_favorite(self, id): status = self.api.create_favorite(id) self.obj = status sid = self.getAtt("id") text = self.getAtt("text") print("create_favorite---"+ str(sid) +":"+ text) #删除当前用户收藏的微博信息 : id:消息id def destroy_favorite(self, id): msg = self.api.destroy_favorite(id) self.obj = msg mid = self.getAtt("id") text = self.getAtt("text") print("destroy_favorite---"+ str(mid) +":"+ text) #关注某用户: fid 用户id def create_friendship(self, fid): user = self.api.create_friendship(id=fid) self.obj = user uid = self.getAtt("id") screen_name = self.getAtt("screen_name") print("create_friendship---"+ str(uid) +":"+ screen_name) #取消关注: fid 用户id def destroy_friendship(self, fid): user = self.api.destroy_friendship(id=fid) self.obj = user uid = self.getAtt("id") screen_name = self.getAtt("screen_name") print("destroy_friendship---"+ str(uid) +":"+ screen_name) #是否关注某用户:用户a是否关注用户b def exists_friendship(self, user_a_id, user_b_id): self.obj = self.api.exists_friendship(user_a=user_a_id, user_b=user_b_id) friends = self.getAtt("friends") print("exists_friendship--- "+ str(friends)) #是否互相关注: uid: 用户id : 似乎没有用 def show_friendship(self, uid): showList = self.api.show_friendship(target_id=uid) for obj in showList: self.obj = obj uid = self.getAtt("id") screen_name = self.getAtt("screen_name") print("show_friendship---"+ str(uid) +":"+ screen_name) #是否为我的关注用户 def is_my_attention(self, uid): self.obj = self.api.exists_friendship(user_a=myuserid, user_b=uid) friends = self.getAtt("friends") print(str(friends)) return friends #是否为我的粉丝用户 def is_my_follower(self, uid): self.obj = self.api.exists_friendship(user_a=uid, user_b=myuserid) friends = self.getAtt("friends") print(str(friends)) return friends #显示当前登录用户id def show_self_id(self): global myuserid showList = self.api.show_friendship(target_id=opposite_id) #存在的id号 self.obj = showList[0] myuserid= self.getAtt("id") screen_name = self.getAtt("screen_name") print("\nuid: " + str(myuserid) + " name: " + str(screen_name)) return myuserid #根据id显示用户信息 def show_opposite_id(self, opposite_id): showList = self.api.show_friendship(target_id=opposite_id) #存在的id号 self.obj = showList[1] #opposite_id= self.getAtt("id") screen_name = self.getAtt("screen_name") print("uid: " + str(opposite_id) + " name: " + str(screen_name)) return opposite_id #根据id显示用户昵称_新 def get_user_nickname(self, opposite_id): showList = self.api.get_user(user_id=opposite_id) #存在的id号 self.obj = showList opposite_id= self.getAtt("id") screen_name = self.getAtt("screen_name") #print("uid: " + str(opposite_id) + " name: " + str(screen_name)) return screen_name #根据id获取用户信息:关注的人数 def get_friends_count(self, uid): showList = self.api.get_user(user_id = uid) self.obj = showList screen_name = self.getAtt("screen_name") description = self.getAtt("description") followers_count = self.getAtt("followers_count") #int friends_count = self.getAtt("friends_count") statuses_count = self.getAtt("statuses_count") return friends_count #根据id获取用户信息: def get_user_info_list(self, uid): showList = self.api.get_user(user_id = uid) user_info_list = showList return user_info_list #关注列表:我关注的用户: 20个 def friends(self): timeline = self.api.friends() for line in timeline: self.obj = line fid = self.getAtt("id") name = self.getAtt("screen_name") print("friends---"+ str(fid) +":"+ name) #获取当前用户所关注用户的最新微博信息 2条 def friends_timeline(self): twitter_dict = [] timeline = self.api.friends_timeline(count=2, page=1) for line in timeline: self.obj = line mid = self.getAtt("id") text = self.getAtt("text") twitter_dict.append({ 'mid': mid, 'txt': text}) print("friends_timeline---"+ str(mid) +":"+ text) return twitter_dict #获取当前用户发送及收到的评论列表 2条 def comments_timeline(self): timeline = self.api.comments_timeline(count=2, page=1) for line in timeline: self.obj = line mid = self.getAtt("id") text = self.getAtt("text") print("comments_timeline---"+ str(mid) +":"+ text) #获取指定用户发布的微博信息列表: def user_timeline(self, userid, amount): timeline = self.api.user_timeline(uid=userid, count=amount) for line in timeline: self.obj = line mid = self.getAtt("id") text = self.getAtt("text") created_at = self.getAtt("created_at") print("user_timeline---"+ str(mid) +":"+ text) #获取用户发布的微博信息列表 5条 def user_timeline_dict(self, amount): user_msg_dict = [] timeline = self.api.user_timeline(count=amount, page=1) for line in timeline: self.obj = line mid = self.getAtt("id") text = self.getAtt("text") user_msg_dict.append({ 'id': mid, 'text': text}) print("user_timeline---"+ str(mid) +":"+ text) return user_msg_dict #获取用户发布的微博信息列表 5条 def user_timeline_pages(self, amount, pagenumber): user_msg_dict = [] timeline = self.api.user_timeline(count=amount, page=pagenumber) for line in timeline: self.obj = line mid = self.getAtt("id") text = self.getAtt("text") user_msg_dict.append({ 'id': mid, 'text': text}) print("user_timeline---"+ str(mid) +":"+ text) return user_msg_dict #获取最新更新的公共微博消息 2条 def public_timeline(self): timeline = self.api.public_timeline(count=2, page=1) for line in timeline: self.obj = line mid = self.getAtt("id") text = self.getAtt("text") print("public_timeline---"+ str(mid) +":"+ text) #获取最新更新的公共微博消息 1 条返回 mid def public_timeline_mid(self): timeline = self.api.public_timeline(count=2, page=1) for line in timeline: self.obj = line mid = self.getAtt("id") return mid #关注列表 def friends_ids(self): fids = self.api.friends_ids() self.obj = fids ids = self.getAtt("ids") next_cursor = self.getAtt("next_cursor") previous_cursor = self.getAtt("previous_cursor") print("friends_ids---"+ str(ids) +":next_cursor"+ str(next_cursor) +":previous_cursor"+ str(previous_cursor)) return ids #关注列表 全部 (用户名) def friends_ids_ex(self): fids = self.api.friends_ids() self.obj = fids ids = self.getAtt("ids") for line in ids: self.obj = line fid = self.getAtt("id") name = self.getAtt("screen_name") print("friends---"+ str(fid) +":"+ name) #粉丝列表 全部 def followers_ids(self): fids = self.api.followers_ids() self.obj = fids ids = self.getAtt("ids") next_cursor = self.getAtt("next_cursor") previous_cursor = self.getAtt("previous_cursor") print("followers_ids---"+ str(ids) +":next_cursor"+ str(next_cursor) +":previous_cursor"+ str(previous_cursor)) return ids #获取@当前用户的微博列表 全部 def mentions(self): comments = self.api.mentions() for comment in comments: self.obj = comment mid = self.getAtt("id") text = self.getAtt("text") print("mentions---"+ str(mid) +":"+ text) #获取隐私信息 ##comment : 谁可以评论此账号的微薄。0:所有人,1:我关注的人。 ##dm : 谁可以给此账号发私信。0:所有人,1:我关注的人。 ##real_name: 是否允许别人通过真实姓名搜索到我, 0允许,1不允许。 ##geo : 发布微博,是否允许微博保存并显示所处的地理位置信息。 0允许,1不允许。 ##badge : 勋章展现状态,值—1私密状态,0公开状态。 def get_privacy(self): privacy = self.api.get_privacy() self.obj = privacy ct = self.getAtt("comment") dm = self.getAtt("dm") real_name = self.getAtt("real_name") geo = self.getAtt("geo") badge = self.getAtt("badge") print ("privacy---"+ str(ct) + str(dm) + str(real_name) + str(geo) + str(badge)) #设置隐私信息 def update_privacy(self): update_privacy = self.api.update_privacy(comment=0) #获取请求次数限制信息 #-针对一个用户在使用一个应用的请求次数限制 #-普通授权: #-总限制:单用户每应用 150次/小时 #-发微博:单用户每应用 30次/小时 #-发评论:单用户每应用 60次/小时 #-加关注:单用户每应用 60次/小时 200次/天 #-{ #- "hourly_limit":150, #本小时内剩余数量 150 次, #- "reset_time_in_seconds":1264994122, #计数器重置剩余时间1264994122,秒, #- "reset_time":"Mon Feb 01 03:15:22 +0800 2010", #下次重置时间03:15:22点。 #- "remaining_hits":150 #剩余访问数量 150 次, #-} def rate_limit_status(self): limit_status = self.api.rate_limit_status() self.obj = limit_status hourly_limit = self.getAtt("hourly_limit") reset_time_in_seconds = self.getAtt("reset_time_in_seconds") reset_time = self.getAtt("reset_time") remaining_hits = self.getAtt("remaining_hits") print ("hourly_limit---" + str(hourly_limit)) print ("reset_time_in_seconds---" + str(reset_time_in_seconds)) print ("reset_time---" + str(reset_time)) print ("remaining_hits---" + str(remaining_hits)) #添加用户标签 def tag_create(self, message): message = message.encode("utf-8") tag_create = self.api.tag_create(tags=message) for line in tag_create: self.obj = line tagid = self.getAtt("tagid") print ("tag_create---"+tagid) #返回用户感兴趣的标签 10条 def tag_suggestions(self): tag_suggestions=self.api.tag_suggestions() for line in tag_suggestions: self.obj = line id = self.getAtt("id") value = self.getAtt("value") print ("tag_suggestions---"+ id +":"+ value) #删除标签 tag_id 标签id def tag_destroy(self,tag_id): tag_destroy=self.api.tag_destroy(tag_id) self.obj=tag_destroy result=self.getAtt("result") print ("tag_destroy---"+ result) #批量删除标签 def tag_destroy_batch(self,tag_ids): tag_destroy_batch=self.api.tag_destroy_batch(tag_ids) for line in tag_destroy_batch: self.obj = line tagid=self.getAtt("tagid") print ("tag_destroy_batch---"+ tagid) #获取某人的话题 uid 用户id def trends(self, uid): trends = self.api.trends(user_id=uid, count=20, page=1) for line in trends: self.obj=line num = self.getAtt("num") trend_id = self.getAtt("trend_id") hotword = self.getAtt("hotword") print ("trends---"+ num +":"+ hotword +":"+trend_id) #新浪未提供测试代码 def trends_statuses(self , message): message = message.encode("utf-8") trends_statuses = self.api.trends_statuses(message) for line in trends_statuses: self.obj=line id = self.getAtt("id") text = self.getAtt("text") print ("ToReader---"+ str(id) + ":" +text) #print text #关注某一个话题 def trends_follow(self , message): message = message.encode("utf-8") trends_follow = self.api.trends_follow(message) #取消关注的某一个话题 def trends_destroy(self , id): trends_destroy=self.api.trends_destroy(id) #按小时返回热门话题 def trends_hourly(self): trends_hourly = self.api.trends_hourly(base_app=0) self.obj=trends_hourly query = self.getAtt("trends") as_of = self.getAtt("as_of") for key in query: key = time.strftime('%Y-%m-%d',time.localtime(as_of)) for line in query[key]: query = line["query"] name = line["name"] print ("trends_hourly---"+" Query:" + query + ", Name:"+ name) #返回最近一天内的热门话题 def trends_daily(self): trends_daily=self.api.trends_daily(base_app=0) self.obj=trends_daily query=self.getAtt("trends") as_of=self.getAtt("as_of") for key in query: key=time.strftime('%Y-%m-%d',time.localtime(as_of)) for line in query[key]: query=line["query"] name=line["name"] print ("trends_daily---"+"Query:" + query+",Name:"+ name) #返回当周热门话题 def trends_weekly(self): trends_weekly=self.api.trends_weekly(base_app=0) self.obj=trends_weekly query=self.getAtt("trends") as_of=self.getAtt("as_of") for key in query: key = time.strftime('%Y-%m-%d',time.localtime(as_of)) for line in query[key]: query=line["query"] name=line["name"] print ("trends_weekly---"+"Query:" + query+",Name:"+ name) #按小时返回热门话题 def trends_hourly(self, number): index = 0 trends_hourly = self.api.trends_hourly(base_app=0) self.obj = trends_hourly query = self.getAtt("trends") as_of = self.getAtt("as_of") for key in query: key = time.strftime('%Y-%m-%d',time.localtime(as_of)) for line in query[key]: if index == 10: return index +=1 query = line["query"] name = line["name"] print ("trends_hourly---"+" Query:" + query + ", Name:"+ name) #获取当前用户未读消息数 def unread(self): count = self.api.unread() self.obj = count mentions = self.getAtt("mentions") comments = self.getAtt("comments") followers = self.getAtt("followers") dm = self.getAtt("dm") print("mentions---"+ str(mentions) +":"+ str(comments) +":"+ str(followers) +":"+ str(dm)) #更改头像 def update_profile_image (self, filename): user = self.api.update_profile_image(filename) self.obj = user id = self.getAtt("id") profile_image_url = self.getAtt("profile_image_url") print("update,uid="+ str(id) +",profile_image_url="+ profile_image_url)def main(): "main function" print(base64.b64decode(b'Q29weXJpZ2h0IChjKSAyMDEyIERvdWN1YmUgSW5jLiBBbGwgcmlnaHRzIHJlc2VydmVkLg==').decode()) for user_index in userInfo.user_info_temp: try: user_title = user_index['user']['title'] user_uid = user_index['user']['uid'] user_nickname = user_index['user']['nickname'] login_account = user_index['login']['account'] login_password = user_index['login']['password'] app_key = user_index['app']['key'] app_secret = user_index['app']['secret'] access_token_key = user_index['token']['key'] access_token_secret = user_index['token']['secret'] test = Test(app_key, app_secret) test.auth(login_account, login_password) test.show_self_id() test.direct_message() test.user_timeline(20) test.comments(3386439134314734) print() except WeibopError as err: print(">>>>>> WeibopError: " + err.reason) except Exception as error: print(">>>>>> PythonError " + str(error)) else: None finally: Noneif __name__ == '__main__': main()