Commit 1079e92d by 据说甜蜜呢

第一次添加代码

parent c1d584a0
namespace Spider
{
public static class AppEnvironment
{
public static string Name { get; set; }
public static bool IsDevelopment() => Name == "Development";
public static bool IsStaging() => Name == "Staging";
}
}
using Microsoft.Extensions.DependencyInjection;
using Spider.Core;
using Spider.Core.DataProvider;
using Spider.Core.Pipeline;
using Spider.Infrastructure;
using Spider.Services;
namespace Spider
{
public class Bootstrapper
{
public static void RegisterServices(IServiceCollection container)
{
container.AddScoped<IWebDriverProvider, WebDriverProvider>();
container.AddScoped<IDataProvider<Production>, ProductionDataProvider>();
container.AddScoped<IDataProvider<User>, UserDataProvider>();
container.AddScoped<IExtractable, Extractor>();
container.AddScoped<ITransformable, Transformer>();
container.AddScoped<ILoadable, Loader>();
container.AddScoped<IExecutor, BasicExecutor>();
container.AddScoped<IDataCollectProcessor, DataCollectBatchProcessor>();
container.AddScoped<IDataCollectEngine, DataCollectEngine>();
}
}
}
using Dora.DynamicProxy;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Options;
using System.Threading.Tasks;
namespace Spider.Core.Caching
{
public class CacheInterceptor
{
private readonly IMemoryCache _cache;
private readonly MemoryCacheEntryOptions _options;
public CacheInterceptor(IMemoryCache cache, IOptions<MemoryCacheEntryOptions> optionsAccessor)
{
_cache = cache;
_options = optionsAccessor.Value;
}
public async Task InvokeAsync(InvocationContext context)
{
var key = new CacheKey(context.Method, context.Arguments);
if (_cache.TryGetValue(key, out var value))
{
context.ReturnValue = value;
}
else
{
await context.ProceedAsync();
_cache.Set(key, context.ReturnValue, _options);
}
}
}
}
using Spider.Infrastructure;
using System.Collections.Generic;
using System.Reflection;
namespace Spider.Core.Caching
{
/// <summary>
/// 实现针对方法返回值的缓存
/// </summary>
public class CacheKey
{
public MethodBase Method { get; }
public object[] InputArguments { get; }
public CacheKey(MethodBase method, object[] arguments)
{
this.Method = method;
this.InputArguments = arguments;
}
public override int GetHashCode()
{
var objs = new List<object> { Method };
objs.AddRange(InputArguments);
return HashCodeHelper.CombineHashCodes(objs);
}
public override bool Equals(object obj)
{
if (ReferenceEquals(this, obj)) return true;
if (ReferenceEquals(null, obj)) return false;
return obj is CacheKey other && GetHashCode() == other.GetHashCode();
}
}
}
using Dora.Interception;
namespace Spider.Core.Caching
{
public class CacheReturnValueAttribute : InterceptorAttribute
{
public override void Use(IInterceptorChainBuilder builder) => builder.Use<CacheInterceptor>(Order);
}
}
using Microsoft.Extensions.Logging;
using Spider.Core.DataProvider;
using Spider.Core.Pipeline;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Spider.Core
{
/// <summary>
/// 商品批处理器。
/// </summary>
public class DataCollectBatchProcessor : IDataCollectProcessor
{
private readonly IExtractable _extractor;
private readonly ITransformable _transformer;
private readonly ILoadable _loader;
private readonly ILogger _logger;
private readonly IDataProvider<Production> _dataProvider;
protected static class RunningState
{
internal const int Idle = 0;
internal const int Running = Idle + 1;
}
private volatile int _running;
public DataCollectBatchProcessor(
IExtractable extractor,
ITransformable transformer,
ILoadable loader,
ILogger<DataCollectBatchProcessor> logger,
IDataProvider<Production> dataProvider)
{
_extractor = extractor;
_transformer = transformer;
_loader = loader;
_logger = logger;
_dataProvider = dataProvider;
}
public async Task RunAsync<TState>(TState state)
{
if (Interlocked.CompareExchange(ref _running, RunningState.Running, RunningState.Idle) == RunningState.Idle)
{
if (!(state is DataCollectContext context)) return;
var start = 0;
// 分小批次平滑处理引擎分配的大批次。
var largeBatchSize = context.LargeBatchSize;
var smallBatchSize = context.SmallBatchSize;
var round = context.SmallBatch <= 0 ? 1 : context.SmallBatch;
var batchSize = largeBatchSize > smallBatchSize ? smallBatchSize : largeBatchSize;
using (_logger.BeginScope($"Data Collect Batch Processor {context}"))
{
try
{
while (start <= largeBatchSize)
{
// double check
if (_running != RunningState.Running)
continue;
_logger.LogInformation(
$"{Environment.NewLine}Small batch: the {round} round[{start}-{start + batchSize}] beginning...");
var productions = await _dataProvider.GetAsync(context);
await ProcessingAsync(context, productions);
_logger.LogInformation($"Small batch: the {round} round[{start}-{start + batchSize}] ended.");
start += batchSize;
context.SmallBatch = round++;
}
}
catch (Exception e)
{
_logger.LogError($"{nameof(RunAsync)} has a unknown exception: {e}.");
}
finally
{
Interlocked.Exchange(ref _running, RunningState.Idle);
}
}
}
else
{
_logger.LogError("Thread is already running.");
}
}
private async Task ProcessingAsync(DataCollectContext context,
IEnumerable<Production> productions)
{
if (productions == null)
return;
var index = 0;
foreach (var production in productions)
{
var trace = $"[index: {++index}]";
_logger.LogInformation($"{Environment.NewLine}{trace}Extracting");
var payloads = _extractor.Extract(production);
_logger.LogInformation($"{trace}Extracted");
if (payloads == null || !payloads.Any())
continue;
_logger.LogInformation($"{trace}Transforming");
await _transformer.TransformAsync(context, payloads);
_logger.LogInformation($"{trace}Transformed");
_logger.LogInformation($"{trace}Loading");
await _loader.LoadAsync(payloads);
_logger.LogInformation($"{trace}Loaded");
}
}
}
}
namespace Spider.Core
{
public class DataCollectContext
{
/// <summary>
/// 处理检查点
/// </summary>
public int CheckPointId { get; set; }
/// <summary>
/// 用户检查点
/// </summary>
public int UserCheckPointId { get; set; }
/// <summary>
/// 总处理数量
/// </summary>
public int TotalSizeProcessed { get; set; }
/// <summary>
/// 大批次:第几轮
/// </summary>
/// <remarks>每个大批次启动一个WebDriver。</remarks>
public int LargeBatch { get; set; } = 1;
/// <summary>
/// 大批次数量
/// </summary>
public int LargeBatchSize { get; set; }
/// <summary>
/// 小批次:第几轮
/// </summary>
public int SmallBatch { get; set; } = 1;
/// <summary>
/// 小批次数量
/// </summary>
public int SmallBatchSize { get; set; } = 100;
/// <summary>
/// 总轮数
/// </summary>
public int Rounds => TotalSizeProcessed % LargeBatchSize == 0 ? TotalSizeProcessed / LargeBatchSize : (TotalSizeProcessed / LargeBatchSize) + 1;
public override string ToString()
{
return $"{{CheckPointId:{CheckPointId},TotalSizeProcessed:{TotalSizeProcessed}, LargeBatch:{LargeBatch}, LargeBatchSize:{LargeBatchSize}, SmallBatch:{SmallBatch}, SmallBatchSize:{SmallBatchSize}, Rounds:{Rounds}}}";
}
public DataCollectContext Clone()
{
return new DataCollectContext()
{
LargeBatch = this.LargeBatch,
LargeBatchSize = this.LargeBatchSize,
SmallBatch = this.SmallBatch,
SmallBatchSize = this.SmallBatchSize,
TotalSizeProcessed = this.TotalSizeProcessed,
CheckPointId = this.CheckPointId
};
}
}
}
namespace Spider.Core
{
public class DataCollectContextBuilder
{
private DataCollectorOptions _options;
private int _checkPointId;
private int _totalSizeProcessed;
public DataCollectContextBuilder AddCheckPoint(int checkPointId)
{
_checkPointId = checkPointId;
return this;
}
public DataCollectContextBuilder AddTotalSize(int totalSizeProcessed)
{
_totalSizeProcessed = totalSizeProcessed;
return this;
}
public DataCollectContextBuilder AddOptions(DataCollectorOptions options)
{
_options = options;
return this;
}
public DataCollectContext Build()
{
var context = new DataCollectContext
{
CheckPointId = _checkPointId,
TotalSizeProcessed = _totalSizeProcessed,
LargeBatchSize = (int)(_options.ProcessFactor * _totalSizeProcessed)
};
return context;
}
}
}
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Spider.Core.DataProvider;
using Spider.Core.Exceptions;
using Spider.Infrastructure;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Spider.Core
{
public class DataCollectEngine : IDataCollectEngine
{
private volatile int _started;
private readonly IExecutor _executor;
private readonly DataCollectorOptions _options;
private readonly IDataProvider<Production> _dataProvider;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<DataCollectEngine> _logger;
public DataCollectEngine(
IExecutor executor,
IOptions<DataCollectorOptions> options,
IDataProvider<Production> dataProvider,
ILogger<DataCollectEngine> logger,
IServiceProvider serviceProvider)
{
_executor = executor;
_dataProvider = dataProvider;
_logger = logger;
_serviceProvider = serviceProvider;
_options = options.Value;
}
public async Task StartAsync()
{
CheckOnlyStartedOnce();
var sizeToProcessed = await _dataProvider.GetCountAsync();
if (sizeToProcessed <= 0)
{
_logger.LogCritical($"[{AppEnvironment.Name}]Engine Startup failure: Total size to processed is {sizeToProcessed}.");
return;
}
var checkPointId = _options.CheckPointId;
if (_options.CheckPointId <= 0)
checkPointId = await _dataProvider.GetCheckPointIdAsync();
var context = new DataCollectContextBuilder()
.AddCheckPoint(checkPointId)
.AddTotalSize(sizeToProcessed)
.AddOptions(_options)
.Build();
await DisruptAndDoWorkAsync(context);
}
private async Task DisruptAndDoWorkAsync(DataCollectContext context)
{
var rounds = context.Rounds;
var workers = new Task[rounds];
using (_logger.BeginScope($"[{AppEnvironment.Name}]Data Collect Engine"))
{
for (var j = 0; j < rounds; j++)
{
context.LargeBatch = j + 1;
using (var scope = _serviceProvider.CreateScope())
{
var processor = scope.ServiceProvider.GetRequiredService<IDataCollectProcessor>();
workers[j] = _executor.ExecuteAsync(processor, context.Clone());
}
}
await Task.WhenAll(workers).ConfigureAwait(false);
}
_logger.LogInformation($"The work has been done.");
}
private void CheckOnlyStartedOnce()
{
if (Interlocked.Exchange(ref _started, 1) == 1)
{
throw new IllegalStateException($"{nameof(StartAsync)} must only be called once.");
}
}
}
}
namespace Spider.Core
{
public class DataCollectorOptions
{
/// <summary>
/// 目标地址
/// </summary>
public string TargetUrl { get; set; }
/// <summary>
/// 处理因子:批次数/总处理数,范围:(0 - 1]
/// </summary>
/// <remarks>
/// 1:表示不分批次,串行处理所有商品。
/// 0.5:分2个大批次并行处理商品。
/// 0.2:分5个大批次并行处理商品。
/// 以此类推...
/// 不能整除的情况下,将会多增加一个大批次。
/// </remarks>
public float ProcessFactor { get; set; }
/// <summary>
/// 从检查点开始处理
/// </summary>
/// <remarks>
/// 不设置:从第一个标识开始处理。
/// 设置:查看日志,手工设置进度检查点。
/// </remarks>
public int CheckPointId { get; set; }
/// <summary>
/// 搜索结果下标
/// </summary>
/// <remarks>
/// 选择爬取搜索结果的第几个商品,默认爬取第一个。
/// </remarks>
public int SearchResultIndex { get; set; } = 1;
/// <summary>
/// 搜索超时,单位:秒,默认:10s。
/// </summary>
public int SearchTimeOutSeconds { get; set; } = 10;
public override string ToString()
{
return $"{{TargetUrl: {TargetUrl},ProcessFactor: {ProcessFactor}}}";
}
}
}
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Spider.Core.DataProvider
{
public interface IDataProvider<T>
{
/// <summary>
/// 获取批数据
/// </summary>
/// <returns></returns>
Task<IEnumerable<T>> GetAsync(DataCollectContext context);
/// <summary>
/// 获取检查点标识
/// </summary>
/// <returns></returns>
Task<int> GetCheckPointIdAsync();
/// <summary>
/// 获取总数
/// </summary>
/// <returns></returns>
Task<int> GetCountAsync();
}
}
namespace Spider.Core.DataProvider
{
public class Production
{
public int Id { get; set; }
public string ShortName { get; set; }
}
}
using Dapper;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
using Spider.Core.Caching;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Spider.Core.DataProvider
{
public class ProductionDataProvider : IDataProvider<Production>
{
private readonly MySqlOptions _options;
private readonly ILogger _logger;
public ProductionDataProvider(
IOptions<MySqlOptions> options,
ILogger<ProductionDataProvider> logger)
{
_logger = logger;
_options = options.Value;
}
public async Task<IEnumerable<Production>> GetAsync(DataCollectContext context)
{
var sql = $"SELECT `ProductionID` `Id`,`ShowProductionName` `ShortName` FROM `tproduction` WHERE `ProductionID` > @LargeBatch LIMIT @BatchSize;";
var batchSize = context.SmallBatchSize;
var largeBatchLastId = context.CheckPointId + (context.LargeBatch - 1) * batchSize;
try
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
var result = await connection.QueryAsync<Production>(sql, new { LargeBatch = largeBatchLastId, BatchSize = batchSize });
return result;
}
}
catch (Exception e)
{
_logger.LogError($"{nameof(GetAsync)}[batchSize: {batchSize}, MySqlOptions: {_options}, sql: {sql}] has a unknown exception: {e}");
return default;
}
}
public async Task<int> GetCheckPointIdAsync()
{
var sql = $"SELECT `ProductionID` FROM `tproduction` ORDER BY `ProductionID` LIMIT 1;";
try
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
var result = await connection.ExecuteScalarAsync<int>(sql);
return result;
}
}
catch (Exception e)
{
_logger.LogError($"{nameof(GetCheckPointIdAsync)}[MySqlOptions: {_options}, sql: {sql}]has a unknown exception: {e}");
return default;
}
}
public async Task<int> GetCountAsync()
{
var sql = $"SELECT count(*) FROM `tproduction`;";
try
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
var result = await connection.ExecuteScalarAsync<int>(sql);
return result;
}
}
catch (Exception e)
{
_logger.LogError($"{nameof(GetCountAsync)}[MySqlOptions: {_options}, sql: {sql}]has a unknown exception: {e}");
return default;
}
}
}
}
namespace Spider.Core.DataProvider
{
public class User
{
public int Id { get; set; }
public string Name { get; set; }
public string Avatar { get; set; }
}
}
using Dapper;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
using Spider.Core.Caching;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Spider.Core.DataProvider
{
public class UserDataProvider : IDataProvider<User>
{
private readonly MySqlOptions _options;
private readonly ILogger _logger;
public UserDataProvider(
IOptions<MySqlOptions> options,
ILogger<UserDataProvider> logger)
{
_logger = logger;
_options = options.Value;
}
[CacheReturnValue]
public async Task<IEnumerable<User>> GetAsync(DataCollectContext context)
{
var sql = $"SELECT `UserId` `Id`,`UserName` `Name`,`avatar` `Avatar` FROM `tuser` WHERE `UserId`>@LargeBatch LIMIT @BatchSize;";
var batchSize = context.SmallBatchSize;
var largeBatchLastId = context.UserCheckPointId + (context.LargeBatch - 1) * batchSize;
try
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
var result = await connection.QueryAsync<User>(sql, new { LargeBatch = largeBatchLastId, BatchSize = batchSize });
return result;
}
}
catch (Exception e)
{
_logger.LogError($"{nameof(GetAsync)}[batchSize: {batchSize}, MySqlOptions: {_options}, sql: {sql}] has a unknown exception: {e}");
return default;
}
}
[CacheReturnValue]
public async Task<int> GetCheckPointIdAsync()
{
var sql = $"SELECT `UserId` FROM `tuser` ORDER BY `UserId` LIMIT 1;";
try
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
var result = await connection.ExecuteScalarAsync<int>(sql);
return result;
}
}
catch (Exception e)
{
_logger.LogError($"{nameof(GetCheckPointIdAsync)}[MySqlOptions: {_options}, sql: {sql}]has a unknown exception: {e}");
return default;
}
}
public Task<int> GetCountAsync()
{
throw new NotSupportedException();
}
}
}
using System;
namespace Spider.Core.Exceptions
{
public class IllegalStateException : Exception
{
public IllegalStateException()
{ }
public IllegalStateException(string message)
: base(message)
{ }
public IllegalStateException(Exception exception)
: base(string.Empty, exception)
{ }
public IllegalStateException(string message, Exception exception)
: base(message, exception)
{ }
}
}
using System.Threading.Tasks;
namespace Spider.Core
{
public interface IDataCollectEngine
{
Task StartAsync();
}
}
using Spider.Infrastructure;
namespace Spider.Core
{
public interface IDataCollectProcessor : IRunnable
{
}
}
namespace Spider.Core
{
public class MySqlOptions
{
public string ConnectionString { get; set; }
public override string ToString()
{
return $"{{ConnectionString: {ConnectionString}}}";
}
}
}
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using OpenQA.Selenium;
using OpenQA.Selenium.Support.UI;
using Spider.Core.DataProvider;
using Spider.Infrastructure.Pos;
using Spider.Services;
using System;
using System.Collections.Generic;
using System.Linq;
namespace Spider.Core.Pipeline
{
public class Extractor : IExtractable
{
private readonly IWebDriver _driver;
private readonly DataCollectorOptions _options;
private readonly ILogger _logger;
public Extractor(
IWebDriverProvider provider,
IOptions<DataCollectorOptions> options,
ILogger<Extractor> logger)
{
_logger = logger;
_options = options.Value;
_driver = provider.Get(Browsers.Chrome);
Initialize();
}
private void Initialize()
{
_driver.Navigate().GoToUrl(_options.TargetUrl);
}
public List<PayLoad> Extract(Production production)
{
if (production == null)
return null;
_logger.LogInformation($"Production[{production.Id}] begin crawling data...");
try
{
var payloads = DataCollect(production);
if (payloads == null || !payloads.Any())
_logger.LogWarning($"Production[{production.Id}] has not questions & answers data.");
else
return payloads;
}
catch (Exception e)
{
_logger.LogError($"Crawling data has a unknown exception: {e}");
}
_logger.LogInformation($"Production[{production.Id}] Extracted.");
return null;
}
private List<PayLoad> DataCollect(Production production)
{
// 等待页面加载
_driver.FindElement(By.Id("searchInput")).SendKeys(production.ShortName + Keys.Enter);
// 1. Find the crawl target.
// 等待搜索结果。
var searchItem = By.XPath($"//ul[@class='clearfix js_seachResultList'][{_options.SearchResultIndex}]/li[{_options.SearchResultIndex}]/a");
var wait = new WebDriverWait(_driver, TimeSpan.FromSeconds(_options.SearchTimeOutSeconds));
wait.Until(wd => wd.FindElement(searchItem)).Click();
// 问答选项卡
//var path = By.XPath(@"//ul[@class='compTab']/li[3]");
//_driver.FindElement(path).Click();
// 2. Crawling data.
// 问答列表(All)
var path = By.XPath(@"//ul[@id='js-faqFilter']/li[1]");
var count = _driver.FindElement(path).GetAttribute("data-total");
if (!int.TryParse(count, out var all) || all <= 0)
return null;
return CrawlingData(production);
}
private List<PayLoad> CrawlingData(Production production)
{
const string path = @"//div[@class='goodsFAQ_panelList']/div[@class='goodsFAQ_panel js-itemPanelCustomerFAQ'][1]//dl[@class='goodsFAQ_customerGroup']";
var elements = _driver.FindElements(By.XPath(path));
if (elements == null || !elements.Any())
return null;
var payloads = new List<PayLoad>();
foreach (var element in elements)
{
var payload = PayloadParse(production, element);
if (payload != null)
payloads.Add(payload);
}
return payloads;
}
private PayLoad PayloadParse(Production production, IWebElement element)
{
try
{
// 问题
var path = By.XPath(@".//dt[@class='goodsFAQ_customerTitle']");
var questionEle = element.FindElement(path);
if (questionEle == null) return null;
path = By.XPath(@".//h4[@class='goodsFAQ_customerQuestion']");
var qContent = questionEle.FindElement(path)?.Text;
path = By.XPath(@".//p[@class='goodsFAQ_customerTime']");
var customerTime = questionEle.FindElement(path)?.Text;
var question = QuestionParse(customerTime);
// 回答
path = By.XPath(@".//dd[@class='goodsFAQ_customerAnswer']");
var answerEle = element.FindElement(path);
path = By.XPath(@".//div[@class='goodsFAQ_customerCont']/p[2]");
var aContent = answerEle?.FindElement(path)?.Text;
// 问题必须爬取,答案可以为空。
var payload = new PayLoad()
{
Question = new Question()
{
ProductionId = production.Id,
ProductionName = production.ShortName,
Content = qContent,
QuestionerName = question.QuestionerName,
CreateTime = question.CreateTime
}
};
if (!string.IsNullOrEmpty(aContent))
{
payload.Answer = new Answer()
{
Content = aContent
};
}
return payload;
}
catch (Exception e)
{
_logger.LogError($"{nameof(PayloadParse)} has a unknown exception: {e}");
return null;
}
}
private (string QuestionerName, DateTime? CreateTime) QuestionParse(string customerTime)
{
if (string.IsNullOrEmpty(customerTime))
return (string.Empty, null);
var length = customerTime.Length;
var index = customerTime.IndexOf('-') - 3;
var buffer = customerTime.AsSpan();
var questioner = buffer.Slice(0, index).Trim();
var time = buffer.Slice(index, length - index).Trim();
return (questioner.ToString(), DateTime.Parse(time));
}
}
}
using Spider.Core.DataProvider;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Spider.Core.Pipeline
{
public class Transformer : ITransformable
{
private static readonly Random Random = new Random();
private readonly IDataProvider<User> _dataProvider;
public Transformer(IDataProvider<User> dataProvider)
{
_dataProvider = dataProvider;
}
/// <summary>
/// 补全、转换、清洗数据
/// </summary>
public async Task TransformAsync(
DataCollectContext context,
IEnumerable<PayLoad> payLoads)
{
if (context == null ||
payLoads == null ||
!payLoads.Any())
return;
var batchSize = context.SmallBatchSize;
context.UserCheckPointId = await _dataProvider.GetCheckPointIdAsync();
var users = (await _dataProvider.GetAsync(context)).ToArray();
foreach (var payLoad in payLoads)
{
var index = Random.Next(0, batchSize);
var hours = Random.Next(1, 24);
payLoad.Question.QuestionerId = users[index].Id;
// 如果后台取的用户名为空,则使用爬取的用户名。
var name = users[index].Name;
payLoad.Question.QuestionerName = string.IsNullOrEmpty(name) ? payLoad.Question.QuestionerName : name;
payLoad.Question.QuestionerAvatar = users[index].Avatar;
payLoad.Question.CreateTime = DateTime.Now.AddHours(hours);
// 随机审核
if (hours % 2 == 0)
{
payLoad.Question.IsAudited = true;
payLoad.Question.AuditTime = DateTime.Now.AddHours(hours + 24);
}
if (payLoad.Answer == null) continue;
// 增加回答数
payLoad.Question.AnswerCount = 1;
index = Random.Next(1, batchSize);
hours = Random.Next(48, 100);
payLoad.Answer.AnswererId = users[index].Id;
payLoad.Answer.AnswererName = users[index]?.Name;
payLoad.Answer.AnswererAvatar = users[index].Avatar;
payLoad.Answer.IsBest = hours % 2 == 0; // 随机设置最佳答案
payLoad.Answer.CreateTime = DateTime.Now.AddHours(hours);
}
}
}
}
using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
using NPoco;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Spider.Core.Pipeline
{
public class Loader : ILoadable
{
private readonly MySqlOptions _options;
public Loader(IOptions<MySqlOptions> options)
{
_options = options.Value;
}
public async Task LoadAsync(IEnumerable<PayLoad> payloads)
{
if (payloads == null ||
!payloads.Any())
return;
using (var db = new Database(_options.ConnectionString, DatabaseType.MySQL, new MySqlClientFactory()))
using (var trans = db.GetTransaction())
{
foreach (var payLoad in payloads)
{
if (payLoad.Question == null) continue;
var result = await db.InsertAsync(payLoad.Question);
if (payLoad.Answer == null ||
!int.TryParse(result?.ToString(), out var questionId))
continue;
payLoad.Answer.QuestionId = questionId;
await db.InsertAsync(payLoad.Answer);
}
trans.Complete();
}
}
}
}
using Spider.Core.DataProvider;
using System.Collections.Generic;
namespace Spider.Core.Pipeline
{
public interface IExtractable
{
List<PayLoad> Extract(Production productions);
}
}
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Spider.Core.Pipeline
{
public interface ILoadable
{
Task LoadAsync(IEnumerable<PayLoad> payloads);
}
}
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Spider.Core.Pipeline
{
public interface ITransformable
{
Task TransformAsync(DataCollectContext context, IEnumerable<PayLoad> payLoads);
}
}
using Spider.Infrastructure.Pos;
namespace Spider.Core.Pipeline
{
public class PayLoad
{
public Question Question { get; set; }
public Answer Answer { get; set; }
}
}
FROM microsoft/dotnet:2.2.0-runtime
RUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
RUN apt-get update -y && apt-get install
WORKDIR /app
COPY . .
EXPOSE 80
ENTRYPOINT ["dotnet", "Spider.dll"]
namespace Spider
{
public enum Browsers
{
Chrome,
PhantomJs
}
}
using System.Collections.Immutable;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Spider.Infrastructure
{
public class BasicExecutor : IExecutor
{
private readonly ImmutableList<Thread> _threads;
public BasicExecutor()
{
_threads = ImmutableList.Create<Thread>();
}
public override string ToString()
{
return $"{{threads:{DumpThreadInfo()}}}";
}
public Task ExecuteAsync<TState>(IRunnable command, TState state)
{
return Task.Factory.StartNew(async () => await InnerExecuteAsync(command, state), TaskCreationOptions.LongRunning);
}
private async Task InnerExecuteAsync<TState>(IRunnable command, TState state)
{
var workerThread = Thread.CurrentThread;
_threads.Add(workerThread);
try
{
await command.RunAsync(state);
}
finally
{
_threads.Remove(workerThread);
}
}
private string DumpThreadInfo()
{
var sb = new StringBuilder();
foreach (var t in _threads)
{
sb.Append("{");
sb.Append("name=").Append(t.Name).Append(",");
sb.Append("id=").Append(t.ManagedThreadId).Append(",");
sb.Append("state=").Append(t.ThreadState);
sb.Append("},");
}
var output = sb.ToString();
return $"[{output.Substring(0, output.Length - 1)}]";
}
}
}
using System.Collections.Generic;
namespace Spider.Infrastructure
{
internal static class HashCodeHelper
{
public static int CombineHashCodes(IEnumerable<object> objs)
{
unchecked
{
var hash = 17;
foreach (var obj in objs)
hash = hash * 23 + (obj != null ? obj.GetHashCode() : 0);
return hash;
}
}
}
}
using System.Threading.Tasks;
namespace Spider.Infrastructure
{
public interface IExecutor
{
/// <summary>
/// Execute the given commands asynchronously in other thread.
/// </summary>
/// <param name="command">Needs to be executed.</param>
/// <param name="state">The status data passed to the command.</param>
/// <returns></returns>
Task ExecuteAsync<TState>(IRunnable command, TState state);
}
}
using System.Threading.Tasks;
namespace Spider.Infrastructure
{
public interface IRunnable
{
Task RunAsync<TState>(TState state);
}
}
\ No newline at end of file
using NPoco;
namespace Spider.Infrastructure.Pos
{
/// <summary>
/// 问题回答表
/// </summary>
/// <remarks>
/// 1. 只能对已购买的商品进行回答,每个问题只能回答一次,并且可以匿名回答。
/// 2. 允许删除自己发布的回答。
/// 3. 允许举报别人发布的回答。
/// 4. 所有已登录的用户都能针对看到的回答,点击是否有用。
/// 5. 提问者不能回答自己的问题。
/// 6. 回答一旦发布,回答者只能删除,不能修改。
/// 7. 回答无需审核。
/// 8. 提问者可以设置最佳回答,最佳回答将被排序显示到问题回答列表第一行。
/// </remarks>
[TableName(Constants.Tables.Answers)]
[PrimaryKey("Id", AutoIncrement = true)]
public class Answer : ModelBase
{
/// <summary>
/// 问题标识
/// </summary>
public int QuestionId { get; set; }
/// <summary>
/// 回答者标识
/// </summary>
public int AnswererId { get; set; }
/// <summary>
/// 回答者名称
/// </summary>
public string AnswererName { get; set; }
/// <summary>
/// 回答者头像
/// </summary>
public string AnswererAvatar { get; set; }
/// <summary>
/// 最佳回答
/// </summary>
public bool IsBest { get; set; } = false;
/// <summary>
/// 有用数
/// </summary>
public int UsefulCount { get; set; }
/// <summary>
/// 评论回复数
/// </summary>
public int CommentReplyCount { get; set; }
}
}
namespace Spider.Infrastructure.Pos
{
public static class Constants
{
public static class Tables
{
public const string Questions = "tqaquestions";
public const string Answers = "tqaanswers";
}
}
}
using System;
namespace Spider.Infrastructure.Pos
{
public class ModelBase
{
/// <summary>
/// 标识
/// </summary>
public int Id { get; set; }
/// <summary>
/// 内容
/// </summary>
public string Content { get; set; }
/// <summary>
/// 创建时间
/// </summary>
public DateTime? CreateTime { get; set; } = DateTime.Now;
/// <summary>
/// 是否是管理员
/// </summary>
public bool IsAdmin { get; set; } = false;
/// <summary>
/// 举报类型标识
/// </summary>
public int? ReportTypeId { get; set; }
/// <summary>
/// 删除标识
/// </summary>
public bool IsDeleted { get; set; } = false;
public override string ToString()
{
return $"Id: {Id},Content: {Content},CreateTime: {CreateTime},IsAdmin: {IsAdmin},ReportTypeId: {ReportTypeId},IsDeleted: {IsDeleted}";
}
}
}
using System;
using NPoco;
namespace Spider.Infrastructure.Pos
{
/// <summary>
/// 问题表
/// </summary>
/// <remarks>
/// 问答模块,发布问题。
/// 1. 所有已登录的用户都能对所见商品进行提问,并且可以匿名提问。
/// 2. 提问需要等待后台管理员先进行审核,审核日期为7天,审核通过或到期后才能展示到商品问答列表。
/// 3. 问题审核通过后,提问者不能删除、修改。
/// 4. 一个问题,可以有多个回答。
/// 5. 同一个商品可以被同一个提问者多次提问,但是内容不能相同。
/// 6. 可以举报别人发布的问题。
/// 7. 支持通过排序号手工干预排序。
/// </remarks>
[TableName(Constants.Tables.Questions)]
[PrimaryKey("Id", AutoIncrement = true)]
public class Question : ModelBase
{
/// <summary>
/// 提问者标识
/// </summary>
public int QuestionerId { get; set; }
/// <summary>
/// 提问者名称
/// </summary>
public string QuestionerName { get; set; }
/// <summary>
/// 提问者头像
/// </summary>
public string QuestionerAvatar { get; set; }
/// <summary>
/// 商品标识
/// </summary>
public int ProductionId { get; set; }
/// <summary>
/// 商品名称
/// </summary>
public string ProductionName { get; set; }
/// <summary>
/// 商品缩略图
/// </summary>
public string ProductionThumbnail { get; set; }
/// <summary>
/// 已审核
/// </summary>
public bool IsAudited { get; set; } = false;
/// <summary>
/// 审核时间
/// </summary>
public DateTime? AuditTime { get; set; }
/// <summary>
/// 排序号
/// </summary>
public int? Order { get; set; }
/// <summary>
/// 回答数
/// </summary>
public int AnswerCount { get; set; }
public override string ToString()
{
return $"[QuestionerId: {QuestionerId},QuestionerName: {QuestionerName},QuestionerAvatar: {QuestionerAvatar},ProductionId: {ProductionId},ProductionName: {ProductionName},ProductionThumbnail: {ProductionThumbnail},IsAudited: {IsAudited},AuditTime: {AuditTime},Order: {Order},AnswerCount: {AnswerCount},{base.ToString()}]";
}
}
}
using System;
using System.Threading.Tasks;
namespace Spider.Infrastructure
{
public class ThreadPoolExecutor : IExecutor
{
public Task ExecuteAsync<TState>(IRunnable command, TState state)
{
throw new NotImplementedException();
}
}
}
using System;
using Microsoft.Extensions.DependencyInjection;
using Spider.Core;
using System;
using System.Threading.Tasks;
namespace Spider
{
class Program
{
static void Main(string[] args)
static async Task Main(string[] args)
{
Console.WriteLine("Hello World!");
try
{
var engine = SpiderServiceProvider
.GetServiceProvider()
.GetRequiredService<IDataCollectEngine>();
await engine.StartAsync();
}
catch (Exception e)
{
Console.WriteLine(e);
}
Console.Read();
}
}
}
{
"profiles": {
"Spider": {
"commandName": "Project",
"environmentVariables": {
"DOTNETCORE_ENVIRONMENT": "Development"
}
}
}
}
\ No newline at end of file
using OpenQA.Selenium;
namespace Spider.Services
{
public interface IWebDriverProvider
{
IWebDriver Get(Browsers browsers);
}
}
using OpenQA.Selenium;
using OpenQA.Selenium.Chrome;
using System;
namespace Spider.Services
{
public class WebDriverProvider : IWebDriverProvider
{
public IWebDriver Get(Browsers browsers)
{
var driver = default(IWebDriver);
switch (browsers)
{
case Browsers.Chrome:
var options = new ChromeOptions();
//options.AddArgument("start-maximized");
// overcome limited resource problems
//options.AddArgument("--disable-dev-shm-usage");
// disabling extensions
//options.AddArgument("--disable-extensions");
if (AppEnvironment.IsStaging())// Linux
{
options.AddArgument("--no-sandbox");// Bypass OS security model
options.AddArgument("--disable-gpu");
options.AddArgument("--disable-extensions");
options.AddArgument("--headless");
}
var service = ChromeDriverService.CreateDefaultService();
driver = new ChromeDriver(service, options);
break;
case Browsers.PhantomJs:
break;
default:
throw new ArgumentOutOfRangeException(nameof(browsers), Enum.GetNames(typeof(Browsers)), null);
}
return driver;
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.2</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.5" />
<PackageReference Include="Dora.Interception" Version="2.1.4" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Configuration" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0" />
<PackageReference Include="MySql.Data" Version="8.0.14" />
<PackageReference Include="NPoco" Version="3.9.4" />
<PackageReference Include="Selenium.Support" Version="3.141.0" />
<PackageReference Include="Selenium.WebDriver" Version="3.141.0" />
</ItemGroup>
<ItemGroup>
<None Update="appsettings.Development.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="appsettings.Staging.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Spider.Core;
using System;
namespace Spider
{
public class SpiderServiceProvider
{
public static IServiceProvider GetServiceProvider()
{
var env = Environment.GetEnvironmentVariable("DOTNETCORE_ENVIRONMENT");
if (string.IsNullOrEmpty(env))
throw new Exception($"Tips: Please firstly set up the application environment[DOTNETCORE_ENVIRONMENT].");
AppEnvironment.Name = env;
var config = new ConfigurationBuilder()
.SetBasePath(AppContext.BaseDirectory)
.AddJsonFile("appsettings.json")
.AddJsonFile($"appsettings.{AppEnvironment.Name}.json")
.Build();
var container = new ServiceCollection()
.AddLogging(logging =>
{
logging.AddConfiguration(config.GetSection("Logging"));
logging.ClearProviders();
logging.AddConsole();
})
.AddOptions()
.AddMemoryCache()
.AddSingleton<IConfiguration>(config)
.Configure<MySqlOptions>(config.GetSection(nameof(MySqlOptions)))
.Configure<DataCollectorOptions>(config.GetSection(nameof(DataCollectorOptions)));
Bootstrapper.RegisterServices(container);
return container.BuildInterceptableServiceProvider();
}
}
}
{
"MySqlOptions": {
"ConnectionString": "server=192.168.2.120;port=6033;user id=root;database=geekbuying_dev;password=123123;characterset=utf8;sslmode=none;"
}
}
{
"MySqlOptions": {
"ConnectionString": "server=172.24.83.222;port=6034;user id=root;database=geekbuying;password=aliyun123123;characterset=utf8;sslmode=none;"
}
}
{
"Logging": {
"LogLevel": {
"Default": "Debug"
},
"Console": {
"IncludeScopes": true,
"Enabled": true
}
},
"DataCollectorOptions": {
"TargetUrl": "https://www.gearbest.com",
"ProcessFactor": 1,
"CheckPointId": 0,
"SearchResultIndex": 1,
"SearchTimeOutSeconds": 5
}
}
{
"sdk": {
"version": "2.2.100"
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment